/*

Copyright (c) 2003, Magnus Jonsson
Copyright (c) 2006-2020, Arvid Norberg
Copyright (c) 2009, Andrew Resch
Copyright (c) 2014-2020, Steven Siloti
Copyright (c) 2015-2020, Alden Torres
Copyright (c) 2015, Thomas
Copyright (c) 2015, Mikhail Titov
Copyright (c) 2016, Falcosc
Copyright (c) 2016-2017, Pavel Pimenov
Copyright (c) 2016-2017, Andrei Kurushin
Copyright (c) 2017, sledgehammer_999
Copyright (c) 2018, Xiyue Deng
Copyright (c) 2020, Fonic
Copyright (c) 2020, Rosen Penev
Copyright (c) 2020, Paul-Louis Ageneau
All rights reserved.

You may use, distribute and modify this code under the terms of the BSD license,
see LICENSE file.
*/

#include "libtorrent/config.hpp"

#include <ctime>
#include <algorithm>
#include <cctype>
#include <cstdio> // for snprintf
#include <cinttypes> // for PRId64 et.al.
#include <functional>
#include <type_traits>
#include <numeric> // for accumulate

#if TORRENT_USE_INVARIANT_CHECKS
#include <unordered_set>
#endif

#include "libtorrent/aux_/disable_warnings_push.hpp"
#include <boost/asio/ts/internet.hpp>
#include <boost/asio/ts/executor.hpp>
#include "libtorrent/aux_/disable_warnings_pop.hpp"

#include "libtorrent/ssl.hpp"
#include "libtorrent/peer_id.hpp"
#include "libtorrent/torrent_info.hpp"
#include "libtorrent/tracker_manager.hpp"
#include "libtorrent/bencode.hpp"
#include "libtorrent/hasher.hpp"
#include "libtorrent/entry.hpp"
#include "libtorrent/session.hpp"
#include "libtorrent/fingerprint.hpp"
#include "libtorrent/alert_types.hpp"
#include "libtorrent/aux_/invariant_check.hpp"
#include "libtorrent/aux_/bt_peer_connection.hpp"
#include "libtorrent/peer_connection_handle.hpp"
#include "libtorrent/ip_filter.hpp"
#include "libtorrent/socket.hpp"
#include "libtorrent/aux_/session_impl.hpp"
#ifndef TORRENT_DISABLE_DHT
#include "libtorrent/kademlia/dht_tracker.hpp"
#include "libtorrent/kademlia/types.hpp"
#include "libtorrent/kademlia/node_entry.hpp"
#endif
#include "libtorrent/enum_net.hpp"
#include "libtorrent/utf8.hpp"
#include "libtorrent/upnp.hpp"
#include "libtorrent/natpmp.hpp"
#include "libtorrent/lsd.hpp"
#include "libtorrent/aux_/instantiate_connection.hpp"
#include "libtorrent/peer_info.hpp"
#include "libtorrent/random.hpp"
#include "libtorrent/magnet_uri.hpp"
#include "libtorrent/aux_/session_settings.hpp"
#include "libtorrent/aux_/torrent_peer.hpp"
#include "libtorrent/torrent_handle.hpp"
#include "libtorrent/aux_/choker.hpp"
#include "libtorrent/error.hpp"
#include "libtorrent/platform_util.hpp"
#include "libtorrent/aux_/bind_to_device.hpp"
#include "libtorrent/hex.hpp" // to_hex, from_hex
#include "libtorrent/aux_/scope_end.hpp"
#include "libtorrent/aux_/set_socket_buffer.hpp"
#include "libtorrent/aux_/generate_peer_id.hpp"
#include "libtorrent/aux_/ffs.hpp"
#include "libtorrent/aux_/array.hpp"

#ifndef TORRENT_DISABLE_LOGGING

#include "libtorrent/socket_io.hpp"

// for logging stat layout
#include "libtorrent/aux_/stat.hpp"

#include <cstdarg> // for va_list

// for logging the size of DHT structures
#ifndef TORRENT_DISABLE_DHT
#include <libtorrent/kademlia/find_data.hpp>
#include <libtorrent/kademlia/refresh.hpp>
#include <libtorrent/kademlia/node.hpp>
#include <libtorrent/kademlia/observer.hpp>
#include <libtorrent/kademlia/item.hpp>
#endif // TORRENT_DISABLE_DHT

#include "libtorrent/aux_/http_tracker_connection.hpp"
#include "libtorrent/aux_/udp_tracker_connection.hpp"

#endif // TORRENT_DISABLE_LOGGING

#ifdef TORRENT_USE_LIBGCRYPT

#if GCRYPT_VERSION_NUMBER < 0x010600
extern "C" {
GCRY_THREAD_OPTION_PTHREAD_IMPL;
}
#endif

namespace {

	// libgcrypt requires this to initialize the library
	struct gcrypt_setup
	{
		gcrypt_setup()
		{
			gcry_check_version(nullptr);
#if GCRYPT_VERSION_NUMBER < 0x010600
			gcry_error_t e = gcry_control(GCRYCTL_SET_THREAD_CBS, &gcry_threads_pthread);
			if (e != 0) std::fprintf(stderr, "libcrypt ERROR: %s\n", gcry_strerror(e));
			e = gcry_control(GCRYCTL_INITIALIZATION_FINISHED, 0);
			if (e != 0) std::fprintf(stderr, "initialization finished error: %s\n", gcry_strerror(e));
#endif
		}
	} gcrypt_global_constructor;
}

#endif // TORRENT_USE_LIBGCRYPT

#ifdef TORRENT_USE_OPENSSL
#ifdef TORRENT_WINDOWS
#include <wincrypt.h>
#endif
#endif // TORRENT_USE_OPENSSL

#ifdef TORRENT_WINDOWS
// for ERROR_SEM_TIMEOUT
#include <winerror.h>
#endif

using namespace std::placeholders;

#ifdef BOOST_NO_EXCEPTIONS
namespace boost {

	void throw_exception(std::exception const& e) { std::abort(); }
}
#endif

namespace libtorrent {

#if defined TORRENT_ASIO_DEBUGGING
	std::map<std::string, async_t> _async_ops;
	std::deque<wakeup_t> _wakeups;
	int _async_ops_nthreads = 0;
	std::mutex _async_ops_mutex;

	std::map<int, handler_alloc_t> _handler_storage;
	std::mutex _handler_storage_mutex;
	bool _handler_logger_registered = false;
#endif

namespace aux {

void apply_deprecated_dht_settings(settings_pack& sett, bdecode_node const& s)
{
	bdecode_node val;
	val = s.dict_find_int("max_peers_reply");
	if (val) sett.set_int(settings_pack::dht_max_peers_reply, int(val.int_value()));
	val = s.dict_find_int("search_branching");
	if (val) sett.set_int(settings_pack::dht_search_branching, int(val.int_value()));
	val = s.dict_find_int("max_fail_count");
	if (val) sett.set_int(settings_pack::dht_max_fail_count, int(val.int_value()));
	val = s.dict_find_int("max_torrents");
	if (val) sett.set_int(settings_pack::dht_max_torrents, int(val.int_value()));
	val = s.dict_find_int("max_dht_items");
	if (val) sett.set_int(settings_pack::dht_max_dht_items, int(val.int_value()));
	val = s.dict_find_int("max_peers");
	if (val) sett.set_int(settings_pack::dht_max_peers, int(val.int_value()));
	val = s.dict_find_int("max_torrent_search_reply");
	if (val) sett.set_int(settings_pack::dht_max_torrent_search_reply, int(val.int_value()));
	val = s.dict_find_int("restrict_routing_ips");
	if (val) sett.set_bool(settings_pack::dht_restrict_routing_ips, (val.int_value() != 0));
	val = s.dict_find_int("restrict_search_ips");
	if (val) sett.set_bool(settings_pack::dht_restrict_search_ips, (val.int_value() != 0));
	val = s.dict_find_int("extended_routing_table");
	if (val) sett.set_bool(settings_pack::dht_extended_routing_table, (val.int_value() != 0));
	val = s.dict_find_int("aggressive_lookups");
	if (val) sett.set_bool(settings_pack::dht_aggressive_lookups, (val.int_value() != 0));
	val = s.dict_find_int("privacy_lookups");
	if (val) sett.set_bool(settings_pack::dht_privacy_lookups, (val.int_value() != 0));
	val = s.dict_find_int("enforce_node_id");
	if (val) sett.set_bool(settings_pack::dht_enforce_node_id, (val.int_value() != 0));
	val = s.dict_find_int("ignore_dark_internet");
	if (val) sett.set_bool(settings_pack::dht_ignore_dark_internet, (val.int_value() != 0));
	val = s.dict_find_int("block_timeout");
	if (val) sett.set_int(settings_pack::dht_block_timeout, int(val.int_value()));
	val = s.dict_find_int("block_ratelimit");
	if (val) sett.set_int(settings_pack::dht_block_ratelimit, int(val.int_value()));
	val = s.dict_find_int("read_only");
	if (val) sett.set_bool(settings_pack::dht_read_only, (val.int_value() != 0));
	val = s.dict_find_int("item_lifetime");
	if (val) sett.set_int(settings_pack::dht_item_lifetime, int(val.int_value()));
}

	std::vector<std::shared_ptr<listen_socket_t>>::iterator partition_listen_sockets(
		std::vector<listen_endpoint_t>& eps
		, std::vector<std::shared_ptr<listen_socket_t>>& sockets)
	{
		return std::partition(sockets.begin(), sockets.end()
			, [&eps](std::shared_ptr<listen_socket_t> const& sock)
		{
			auto match = std::find_if(eps.begin(), eps.end()
				, [&sock](listen_endpoint_t const& ep)
			{
				return ep.ssl == sock->ssl
					&& ep.port == sock->original_port
					&& ep.device == sock->device
					&& ep.flags == sock->flags
					&& ep.addr == sock->local_endpoint.address();
			});

			if (match != eps.end())
			{
				// remove the matched endpoint so that another socket can't match it
				// this also signals to the caller that it doesn't need to create a
				// socket for the endpoint
				eps.erase(match);
				return true;
			}
			else
			{
				return false;
			}
		});
	}

	// To comply with BEP 45 multi homed clients must run separate DHT nodes
	// on each interface they use to talk to the DHT. This is enforced
	// by prohibiting creating a listen socket on [::] and 0.0.0.0. Instead the list of
	// interfaces is enumerated and sockets are created for each of them.
	void expand_unspecified_address(span<ip_interface const> const ifs
		, span<ip_route const> const routes
		, std::vector<listen_endpoint_t>& eps)
	{
		auto unspecified_begin = std::partition(eps.begin(), eps.end()
			, [](listen_endpoint_t const& ep) { return !ep.addr.is_unspecified(); });
		std::vector<listen_endpoint_t> unspecified_eps(unspecified_begin, eps.end());
		eps.erase(unspecified_begin, eps.end());
		for (auto const& uep : unspecified_eps)
		{
			bool const v4 = uep.addr.is_v4();
			for (auto const& ipface : ifs)
			{
				if (!ipface.preferred)
					continue;
				if (ipface.interface_address.is_v4() != v4)
					continue;
				if (!uep.device.empty() && uep.device != ipface.name)
					continue;
				if (std::any_of(eps.begin(), eps.end(), [&](listen_endpoint_t const& e)
				{
					// ignore device name because we don't want to create
					// duplicates if the user explicitly configured an address
					// without a device name
					return e.addr == ipface.interface_address
						&& e.port == uep.port
						&& e.ssl == uep.ssl;
				}))
				{
					continue;
				}

				// ignore interfaces that are down
				if (ipface.state != if_state::up && ipface.state != if_state::unknown)
					continue;
				if (!(ipface.flags & if_flags::up))
					continue;

				// we assume this listen_socket_t is local-network under some
				// conditions, meaning we won't announce it to internet trackers
				bool const local
					= ipface.interface_address.is_loopback()
					|| is_link_local(ipface.interface_address)
					|| (ipface.flags & if_flags::loopback)
					|| (!is_global(ipface.interface_address)
						&& !(ipface.flags & if_flags::pointopoint)
						&& !has_internet_route(ipface.name, family(ipface.interface_address), routes));

				eps.emplace_back(ipface.interface_address, uep.port, uep.device
					, uep.ssl, uep.flags | listen_socket_t::was_expanded
					| (local ? listen_socket_t::local_network : listen_socket_flags_t{}));
			}
		}
	}

	void expand_devices(span<ip_interface const> const ifs
		, std::vector<listen_endpoint_t>& eps)
	{
		for (auto& ep : eps)
		{
			auto const iface = ep.device.empty()
				? std::find_if(ifs.begin(), ifs.end(), [&](ip_interface const& ipface)
					{
						return match_addr_mask(ipface.interface_address, ep.addr, ipface.netmask);
					})
				: std::find_if(ifs.begin(), ifs.end(), [&](ip_interface const& ipface)
					{
						return ipface.name == ep.device
							&& match_addr_mask(ipface.interface_address, ep.addr, ipface.netmask);
					});

			if (iface == ifs.end())
			{
				// we can't find which device this is for, just assume we can't
				// reach anything on it
				ep.netmask = build_netmask(0, ep.addr.is_v4() ? AF_INET : AF_INET6);
				continue;
			}

			ep.netmask = iface->netmask;
			ep.device = iface->name;
		}
	}

	bool listen_socket_t::can_route(address const& addr) const
	{
		// if this is a proxy, we assume it can reach everything
		if (flags & proxy) return true;

		if (is_v4(local_endpoint) != addr.is_v4()) return false;

		if (local_endpoint.address().is_v6()
			&& local_endpoint.address().to_v6().scope_id() != addr.to_v6().scope_id())
			return false;

		if (local_endpoint.address() == addr) return true;
		if (local_endpoint.address().is_unspecified()) return true;
		if (match_addr_mask(addr, local_endpoint.address(), netmask)) return true;
		return !(flags & local_network);
	}

	void session_impl::init_peer_class_filter(bool unlimited_local)
	{
		// set the default peer_class_filter to use the local peer class
		// for peers on local networks
		std::uint32_t lfilter = 1 << static_cast<std::uint32_t>(m_local_peer_class);
		std::uint32_t gfilter = 1 << static_cast<std::uint32_t>(m_global_class);

		struct class_mapping
		{
			char const* first;
			char const* last;
			std::uint32_t filter;
		};

		static const class_mapping v4_classes[] =
		{
			// everything
			{"0.0.0.0", "255.255.255.255", gfilter},
			// local networks
			{"10.0.0.0", "10.255.255.255", lfilter},
			{"172.16.0.0", "172.31.255.255", lfilter},
			{"192.168.0.0", "192.168.255.255", lfilter},
			// link-local
			{"169.254.0.0", "169.254.255.255", lfilter},
			// loop-back
			{"127.0.0.0", "127.255.255.255", lfilter},
		};

		static const class_mapping v6_classes[] =
		{
			// everything
			{"::0", "ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff", gfilter},
			// local networks
			{"fc00::", "fdff:ffff:ffff:ffff:ffff:ffff:ffff:ffff", lfilter},
			// link-local
			{"fe80::", "febf::ffff:ffff:ffff:ffff:ffff:ffff:ffff", lfilter},
			// loop-back
			{"::1", "::1", lfilter},
		};

		class_mapping const* p = v4_classes;
		int len = sizeof(v4_classes) / sizeof(v4_classes[0]);
		if (!unlimited_local) len = 1;
		for (int i = 0; i < len; ++i)
		{
			error_code ec;
			address_v4 begin = make_address_v4(p[i].first, ec);
			address_v4 end = make_address_v4(p[i].last, ec);
			if (ec) continue;
			m_peer_class_filter.add_rule(begin, end, p[i].filter);
		}
		p = v6_classes;
		len = sizeof(v6_classes) / sizeof(v6_classes[0]);
		if (!unlimited_local) len = 1;
		for (int i = 0; i < len; ++i)
		{
			error_code ec;
			address_v6 begin = make_address_v6(p[i].first, ec);
			address_v6 end = make_address_v6(p[i].last, ec);
			if (ec) continue;
			m_peer_class_filter.add_rule(begin, end, p[i].filter);
		}
	}

#ifdef TORRENT_SSL_PEERS
	namespace {
	// when running bittorrent over SSL, the SNI (server name indication)
	// extension is used to know which torrent the incoming connection is
	// trying to connect to. The 40 first bytes in the name is expected to
	// be the hex encoded info-hash
	bool ssl_server_name_callback_impl(ssl::stream_handle_type stream_handle, std::string const& name, session_impl* si)
	{
		if (name.size() < 40)
			return false;

		info_hash_t info_hash;
		bool valid = aux::from_hex({name.c_str(), 40}, info_hash.v1.data());

		// the server name is not a valid hex-encoded info-hash
		if (!valid)
			return false;

		// see if there is a torrent with this info-hash
		std::shared_ptr<torrent> t = si ? si->find_torrent(info_hash).lock() : nullptr;

		// if there isn't, fail
		if (!t) return false;

		// if the torrent we found isn't an SSL torrent, also fail.
		if (!t->is_ssl_torrent()) return false;

		// if the torrent doesn't have an SSL context and should not allow
		// incoming SSL connections
		auto* torrent_ctx = t->ssl_ctx();
		if (!torrent_ctx) return false;

		// use this torrent's certificate
		ssl::set_context(stream_handle, ssl::get_handle(*torrent_ctx));
		return true;
	}

#if defined TORRENT_USE_OPENSSL
int ssl_server_name_callback(SSL* s, int*, void* arg)
{
	char const* name = SSL_get_servername(s, TLSEXT_NAMETYPE_host_name);
	auto* si = reinterpret_cast<session_impl*>(arg);
	return ssl_server_name_callback_impl(s, name ? std::string(name) : "", si)
			? SSL_TLSEXT_ERR_OK
			: SSL_TLSEXT_ERR_ALERT_FATAL;
}
#elif defined TORRENT_USE_GNUTLS
bool ssl_server_name_callback(ssl::stream_handle_type stream_handle, std::string const& name, void* arg)
{
	auto* si = reinterpret_cast<session_impl*>(arg);
	return ssl_server_name_callback_impl(stream_handle, name, si);
}
#endif
	} // anonymous namespace
#endif // TORRENT_SSL_PEERS

	session_impl::session_impl(io_context& ioc, settings_pack const& pack
		, disk_io_constructor_type disk_io_constructor
		, session_flags_t const flags)
		: m_settings(pack)
		, m_io_context(ioc)
#if TORRENT_USE_SSL
		, m_ssl_ctx(ssl::context::tls_client)
#ifdef TORRENT_SSL_PEERS
		, m_peer_ssl_ctx(ssl::context::tls)
#endif
#endif // TORRENT_USE_SSL
		, m_alerts(m_settings.get_int(settings_pack::alert_queue_size)
			, alert_category_t{static_cast<unsigned int>(m_settings.get_int(settings_pack::alert_mask))})
		, m_disk_thread((disk_io_constructor ? disk_io_constructor : default_disk_io_constructor)
			(m_io_context, m_settings, m_stats_counters))
		, m_download_rate(peer_connection::download_channel)
		, m_upload_rate(peer_connection::upload_channel)
		, m_host_resolver(m_io_context)
		, m_tracker_manager(
			std::bind(&session_impl::send_udp_packet_listen, this, _1, _2, _3, _4, _5)
			, std::bind(&session_impl::send_udp_packet_hostname_listen, this, _1, _2, _3, _4, _5, _6)
			, m_stats_counters
			, m_host_resolver
			, m_settings
#if !defined TORRENT_DISABLE_LOGGING || TORRENT_USE_ASSERTS
			, *this
#endif
			)
		, m_work(make_work_guard(m_io_context))
#if TORRENT_USE_I2P
		, m_i2p_conn(m_io_context)
#endif
		, m_created(clock_type::now())
		, m_last_tick(m_created)
		, m_last_second_tick(m_created - milliseconds(900))
		, m_last_choke(m_created)
		, m_last_auto_manage(m_created)
#ifndef TORRENT_DISABLE_DHT
		, m_dht_announce_timer(m_io_context)
#endif
		, m_utp_socket_manager(
			std::bind(&session_impl::send_udp_packet, this, _1, _2, _3, _4, _5)
			, [this](socket_type s) { this->incoming_connection(std::move(s)); }
			, m_io_context
			, m_settings, m_stats_counters, nullptr)
#ifdef TORRENT_SSL_PEERS
		, m_ssl_utp_socket_manager(
			std::bind(&session_impl::send_udp_packet, this, _1, _2, _3, _4, _5)
			, std::bind(&session_impl::on_incoming_utp_ssl, this, _1)
			, m_io_context
			, m_settings, m_stats_counters
			, &m_peer_ssl_ctx)
#endif
		, m_timer(m_io_context)
		, m_lsd_announce_timer(m_io_context)
		, m_close_file_timer(m_io_context)
		, m_paused(flags & session::paused)
	{
	}

	template <typename Fun, typename... Args>
	void session_impl::wrap(Fun f, Args&&... a)
#ifndef BOOST_NO_EXCEPTIONS
	try
#endif
	{
		(this->*f)(std::forward<Args>(a)...);
	}
#ifndef BOOST_NO_EXCEPTIONS
	catch (system_error const& e) {
		alerts().emplace_alert<session_error_alert>(e.code(), e.what());
		pause();
	} catch (std::exception const& e) {
		alerts().emplace_alert<session_error_alert>(error_code(), e.what());
		pause();
	} catch (...) {
		alerts().emplace_alert<session_error_alert>(error_code(), "unknown error");
		pause();
	}
#endif

	// This function is called by the creating thread, not in the message loop's
	// io_context thread.
	// TODO: 2 is there a reason not to move all of this into init()? and just
	// post it to the io_context?
	void session_impl::start_session()
	{
#ifndef TORRENT_DISABLE_LOGGING
		session_log("start session");
#endif

#if TORRENT_USE_SSL
		error_code ec;
		m_ssl_ctx.set_default_verify_paths(ec);
#if defined TORRENT_WINDOWS && defined TORRENT_USE_OPENSSL
		// TODO: come up with some abstraction to do this for gnutls as well
		// load certificates from the windows system certificate store
		X509_STORE* store = X509_STORE_new();
		if (store)
		{
			HCERTSTORE system_store = CertOpenSystemStoreA(0, "ROOT");
			// this is best effort
			if (system_store)
			{
				CERT_CONTEXT const* ctx = nullptr;
				while ((ctx = CertEnumCertificatesInStore(system_store, ctx)) != nullptr)
				{
					unsigned char const* cert_ptr = reinterpret_cast<unsigned char const*>(ctx->pbCertEncoded);
					X509* x509 = d2i_X509(nullptr, &cert_ptr, ctx->cbCertEncoded);
					// this is best effort
					if (!x509) continue;
					X509_STORE_add_cert(store, x509);
					X509_free(x509);
				}
				CertFreeCertificateContext(ctx);
				CertCloseStore(system_store, 0);
			}
		}

		SSL_CTX* ssl_ctx = m_ssl_ctx.native_handle();
		SSL_CTX_set_cert_store(ssl_ctx, store);
#endif
#endif
#ifdef TORRENT_SSL_PEERS
		m_peer_ssl_ctx.set_verify_mode(ssl::context::verify_none, ec);
		ssl::set_server_name_callback(ssl::get_handle(m_peer_ssl_ctx), ssl_server_name_callback, this, ec);
#endif // TORRENT_SSL_PEERS

#ifndef TORRENT_DISABLE_DHT
		m_next_dht_torrent = 0;
#endif
		m_next_lsd_torrent = 0;

		m_global_class = m_classes.new_peer_class("global");
		m_tcp_peer_class = m_classes.new_peer_class("tcp");
		m_local_peer_class = m_classes.new_peer_class("local");
		// local peers are always unchoked
		m_classes.at(m_local_peer_class)->ignore_unchoke_slots = true;
		// local peers are allowed to exceed the normal connection
		// limit by 50%
		m_classes.at(m_local_peer_class)->connection_limit_factor = 150;

		TORRENT_ASSERT(m_global_class == session::global_peer_class_id);
		TORRENT_ASSERT(m_tcp_peer_class == session::tcp_peer_class_id);
		TORRENT_ASSERT(m_local_peer_class == session::local_peer_class_id);

		init_peer_class_filter(true);

		// TCP, SSL/TCP and I2P connections should be assigned the TCP peer class
		m_peer_class_type_filter.add(peer_class_type_filter::tcp_socket, m_tcp_peer_class);
		m_peer_class_type_filter.add(peer_class_type_filter::ssl_tcp_socket, m_tcp_peer_class);
		m_peer_class_type_filter.add(peer_class_type_filter::i2p_socket, m_tcp_peer_class);

#ifndef TORRENT_DISABLE_LOGGING

		session_log("version: %s revision: %" PRIx64
			, lt::version_str, lt::version_revision);

#endif // TORRENT_DISABLE_LOGGING

		// ---- auto-cap max connections ----
		int const max_files = max_open_files();
		// deduct some margin for epoll/kqueue, log files,
		// futexes, shared objects etc.
		// 80% of the available file descriptors should go to connections
		m_settings.set_int(settings_pack::connections_limit, std::min(
			m_settings.get_int(settings_pack::connections_limit)
			, std::max(5, (max_files - 20) * 8 / 10)));
		// 20% goes towards regular files (see disk_io_thread)
#ifndef TORRENT_DISABLE_LOGGING
		if (should_log())
		{
			session_log("max-connections: %d max-files: %d"
				, m_settings.get_int(settings_pack::connections_limit)
				, max_files);
		}
#endif

		post(m_io_context, [this] { wrap(&session_impl::init); });
	}

	void session_impl::init()
	{
		// this is a debug facility
		// see single_threaded in debug.hpp
		thread_started();

		TORRENT_ASSERT(is_single_thread());

#ifndef TORRENT_DISABLE_LOGGING
		session_log(" *** session thread init");
#endif

		// this is where we should set up all async operations. This
		// is called from within the network thread as opposed to the
		// constructor which is called from the main thread

#if defined TORRENT_ASIO_DEBUGGING
		async_inc_threads();
		add_outstanding_async("session_impl::on_tick");
#endif
		post(m_io_context, [this]{ wrap(&session_impl::on_tick, error_code()); });

		int const lsd_announce_interval
			= m_settings.get_int(settings_pack::local_service_announce_interval);
		int const delay = std::max(lsd_announce_interval
			/ std::max(static_cast<int>(m_torrents.size()), 1), 1);
		m_lsd_announce_timer.expires_after(seconds(delay));
		ADD_OUTSTANDING_ASYNC("session_impl::on_lsd_announce");
		m_lsd_announce_timer.async_wait([this](error_code const& e) {
			wrap(&session_impl::on_lsd_announce, e); } );

#ifndef TORRENT_DISABLE_LOGGING
		session_log(" done starting session");
#endif

		// this applies unchoke settings from m_settings
		recalculate_unchoke_slots();

		// apply all m_settings to this session
		run_all_updates(*this);
		reopen_listen_sockets(false);

#if TORRENT_USE_INVARIANT_CHECKS
		check_invariant();
#endif
	}

#if TORRENT_ABI_VERSION <= 2
	// TODO: 2 the ip filter should probably be saved here too
	void session_impl::save_state(entry* eptr, save_state_flags_t const flags) const
	{
		TORRENT_ASSERT(is_single_thread());

		entry& e = *eptr;
		// make it a dict
		e.dict();

		if (flags & session::save_settings)
		{
			entry::dictionary_type& sett = e["settings"].dict();
			save_settings_to_dict(non_default_settings(m_settings), sett);
		}

#ifndef TORRENT_DISABLE_DHT
		if (flags & session::save_dht_settings)
		{
			e["dht"] = dht::save_dht_settings(get_dht_settings());
		}

		if (m_dht && (flags & session::save_dht_state))
		{
			e["dht state"] = dht::save_dht_state(m_dht->state());
		}
#endif

#ifndef TORRENT_DISABLE_EXTENSIONS
		for (auto const& ext : m_ses_extensions[plugins_all_idx])
		{
			ext->save_state(*eptr);
		}
#endif
	}

	void session_impl::load_state(bdecode_node const* e
		, save_state_flags_t const flags)
	{
		TORRENT_ASSERT(is_single_thread());

		bdecode_node settings;
		if (e->type() != bdecode_node::dict_t) return;

#ifndef TORRENT_DISABLE_DHT
		bool need_update_dht = false;
		if (flags & session_handle::save_dht_state)
		{
			settings = e->dict_find_dict("dht state");
			if (settings)
			{
				m_dht_state = dht::read_dht_state(settings);
				need_update_dht = true;
			}
		}
#endif

#if TORRENT_ABI_VERSION == 1
		bool need_update_proxy = false;
		if (flags & session_handle::save_proxy)
		{
			settings = e->dict_find_dict("proxy");
			if (settings)
			{
				m_settings.bulk_set([&settings](session_settings_single_thread& s)
				{
					bdecode_node val;
					val = settings.dict_find_int("port");
					if (val) s.set_int(settings_pack::proxy_port, int(val.int_value()));
					val = settings.dict_find_int("type");
					if (val) s.set_int(settings_pack::proxy_type, int(val.int_value()));
					val = settings.dict_find_int("proxy_hostnames");
					if (val) s.set_bool(settings_pack::proxy_hostnames, val.int_value() != 0);
					val = settings.dict_find_int("proxy_peer_connections");
					if (val) s.set_bool(settings_pack::proxy_peer_connections, val.int_value() != 0);
					val = settings.dict_find_string("hostname");
					if (val) s.set_str(settings_pack::proxy_hostname, std::string(val.string_value()));
					val = settings.dict_find_string("password");
					if (val) s.set_str(settings_pack::proxy_password, std::string(val.string_value()));
					val = settings.dict_find_string("username");
					if (val) s.set_str(settings_pack::proxy_username, std::string(val.string_value()));
				});
				need_update_proxy = true;
			}
		}

		settings = e->dict_find_dict("encryption");
		if (settings)
		{
			m_settings.bulk_set([&settings](session_settings_single_thread& s)
			{
				bdecode_node val;
				val = settings.dict_find_int("prefer_rc4");
				if (val) s.set_bool(settings_pack::prefer_rc4, val.int_value() != 0);
				val = settings.dict_find_int("out_enc_policy");
				if (val) s.set_int(settings_pack::out_enc_policy, int(val.int_value()));
				val = settings.dict_find_int("in_enc_policy");
				if (val) s.set_int(settings_pack::in_enc_policy, int(val.int_value()));
				val = settings.dict_find_int("allowed_enc_level");
				if (val) s.set_int(settings_pack::allowed_enc_level, int(val.int_value()));
			});
		}
#endif

		if ((flags & session_handle::save_settings)
			|| (flags & session_handle::save_dht_settings)
			)
		{
			settings = e->dict_find_dict("settings");
			if (settings)
			{
				// apply_settings_pack will update dht and proxy
				settings_pack pack = load_pack_from_dict(settings);

				// these settings are not loaded from state
				// they are set by the client software, not configured by users
				pack.clear(settings_pack::user_agent);
				pack.clear(settings_pack::peer_fingerprint);

				apply_settings_pack_impl(pack);
#ifndef TORRENT_DISABLE_DHT
				need_update_dht = false;
#endif
#if TORRENT_ABI_VERSION == 1
				need_update_proxy = false;
#endif
			}
		}

		if (flags & session_handle::save_dht_settings)
		{
			// This is here for backwards compatibility, to support loading state
			// files in the previous file format, where the DHT settings were in
			// its own dictionary
			settings = e->dict_find_dict("dht");
			if (settings)
			{
				settings_pack sett;
				aux::apply_deprecated_dht_settings(sett, settings);
				apply_settings_pack_impl(sett);
			}
		}

#ifndef TORRENT_DISABLE_DHT
		if (need_update_dht) start_dht();
#endif
#if TORRENT_ABI_VERSION == 1
		if (need_update_proxy) update_proxy();
#endif

#ifndef TORRENT_DISABLE_EXTENSIONS
		for (auto& ext : m_ses_extensions[plugins_all_idx])
		{
			ext->load_state(*e);
		}
#endif
	}
#endif

	session_params session_impl::session_state(save_state_flags_t const flags) const
	{
		TORRENT_ASSERT(is_single_thread());

		session_params ret;
		if (flags & session::save_settings)
			ret.settings = non_default_settings(m_settings);

#ifndef TORRENT_DISABLE_DHT
#if TORRENT_ABI_VERSION <= 2
	if (flags & session_handle::save_dht_settings)
	{
		ret.dht_settings = get_dht_settings();
	}
#endif

		if (m_dht && (flags & session::save_dht_state))
			ret.dht_state = m_dht->state();
#endif

#ifndef TORRENT_DISABLE_EXTENSIONS
		if (flags & session::save_extension_state)
		{
			for (auto const& ext : m_ses_extensions[plugins_all_idx])
			{
				auto state = ext->save_state();
				for (auto& v : state)
					ret.ext_state[std::move(v.first)] = std::move(v.second);
			}
		}
#endif

		if ((flags & session::save_ip_filter) && m_ip_filter)
		{
			ret.ip_filter = *m_ip_filter;
		}
		return ret;
	}

	proxy_settings session_impl::proxy() const
	{
		return proxy_settings(m_settings);
	}

#ifndef TORRENT_DISABLE_EXTENSIONS

	void session_impl::add_extension(ext_function_t ext)
	{
		TORRENT_ASSERT(is_single_thread());
		TORRENT_ASSERT(ext);

		add_ses_extension(std::make_shared<session_plugin_wrapper>(ext));
	}

	void session_impl::add_ses_extension(std::shared_ptr<plugin> ext)
	{
		// this is called during startup of the session, from the thread creating
		// it, not its own thread
//		TORRENT_ASSERT(is_single_thread());
		TORRENT_ASSERT_VAL(ext, ext);

		feature_flags_t const features = ext->implemented_features();

		m_ses_extensions[plugins_all_idx].push_back(ext);

		if (features & plugin::optimistic_unchoke_feature)
			m_ses_extensions[plugins_optimistic_unchoke_idx].push_back(ext);
		if (features & plugin::tick_feature)
			m_ses_extensions[plugins_tick_idx].push_back(ext);
		if (features & plugin::dht_request_feature)
			m_ses_extensions[plugins_dht_request_idx].push_back(ext);
		if (features & plugin::alert_feature)
			m_alerts.add_extension(ext);
		session_handle h(shared_from_this());
		ext->added(h);
	}

#endif // TORRENT_DISABLE_EXTENSIONS

	void session_impl::pause()
	{
		TORRENT_ASSERT(is_single_thread());

		if (m_paused) return;
#ifndef TORRENT_DISABLE_LOGGING
		session_log(" *** session paused ***");
#endif
		// this will abort all tracker announces other than event=stopped
		m_tracker_manager.abort_all_requests();

		m_paused = true;
		for (auto& te : m_torrents)
		{
			te->set_session_paused(true);
		}
	}

	void session_impl::resume()
	{
		TORRENT_ASSERT(is_single_thread());

		if (!m_paused) return;
		m_paused = false;

		for (auto& te : m_torrents)
		{
			te->set_session_paused(false);
		}
	}

	void session_impl::abort() noexcept
	{
		TORRENT_ASSERT(is_single_thread());

		if (m_abort) return;
#ifndef TORRENT_DISABLE_LOGGING
		session_log(" *** ABORT CALLED ***");
#endif

		// at this point we cannot call the notify function anymore, since the
		// session will become invalid.
		m_alerts.set_notify_function({});

#ifndef TORRENT_DISABLE_EXTENSIONS
		for (auto& ext : m_ses_extensions[plugins_all_idx])
		{
			ext->abort();
		}
#endif

		// this will cancel requests that are not critical for shutting down
		// cleanly. i.e. essentially tracker hostname lookups that we're not
		// about to send event=stopped to
		m_host_resolver.abort();

		m_close_file_timer.cancel();

		// abort the main thread
		m_abort = true;
		error_code ec;

		// we rely on on_tick() during shutdown, but we don't need to wait a
		// whole second for it to fire
		m_timer.cancel();

#if TORRENT_USE_I2P
		m_i2p_conn.close(ec);
#endif
		stop_ip_notifier();
		stop_lsd();
		stop_upnp();
		stop_natpmp();
#ifndef TORRENT_DISABLE_DHT
		stop_dht();
		m_dht_announce_timer.cancel();
#endif
		m_lsd_announce_timer.cancel();

#ifdef TORRENT_SSL_PEERS
		for (auto const& s : m_incoming_sockets)
		{
			s->close(ec);
			TORRENT_ASSERT(!ec);
		}
		m_incoming_sockets.clear();
#endif

#if TORRENT_USE_I2P
		if (m_i2p_listen_socket && m_i2p_listen_socket->is_open())
		{
			m_i2p_listen_socket->close(ec);
			TORRENT_ASSERT(!ec);
		}
		m_i2p_listen_socket.reset();
#endif

#ifndef TORRENT_DISABLE_LOGGING
		session_log(" aborting all torrents (%d)", int(m_torrents.size()));
#endif
		// abort all torrents
		for (auto const& te : m_torrents)
		{
			te->abort();
		}
		m_torrents.clear();
		m_stats_counters.set_value(counters::num_peers_up_unchoked_all, 0);
		m_stats_counters.set_value(counters::num_peers_up_unchoked, 0);
		m_stats_counters.set_value(counters::num_peers_up_unchoked_optimistic, 0);

#ifndef TORRENT_DISABLE_LOGGING
		session_log(" aborting all tracker requests");
#endif
		m_tracker_manager.stop();

#ifndef TORRENT_DISABLE_LOGGING
		session_log(" aborting all connections (%d)", int(m_connections.size()));
#endif
		// abort all connections
		for (auto i = m_connections.begin(); i != m_connections.end();)
		{
			peer_connection* p = (*i).get();
			++i;
			p->disconnect(errors::stopping_torrent, operation_t::bittorrent);
		}

		// close the listen sockets
		for (auto const& l : m_listen_sockets)
		{
			if (l->sock)
			{
				l->sock->close(ec);
				TORRENT_ASSERT(!ec);
			}

			// TODO: 3 closing the udp sockets here means that
			// the uTP connections cannot be closed gracefully
			if (l->udp_sock)
			{
				l->udp_sock->sock.close();
			}
		}

		// we need to give all the sockets an opportunity to actually have their handlers
		// called and cancelled before we continue the shutdown. This is a bit
		// complicated, if there are no "undead" peers, it's safe to resume the
		// shutdown, but if there are, we have to wait for them to be cleared out
		// first. In session_impl::on_tick() we check them periodically. If we're
		// shutting down and we remove the last one, we'll initiate
		// shutdown_stage2 from there.
		if (m_undead_peers.empty())
		{
			post(m_io_context, make_handler([this] { abort_stage2(); }
				, m_abort_handler_storage, *this));
		}
	}

	void session_impl::abort_stage2() noexcept
	{
		m_download_rate.close();
		m_upload_rate.close();

		// it's OK to detach the threads here. The disk_io_thread
		// has an internal counter and won't release the network
		// thread until they're all dead (via m_work).
		m_disk_thread->abort(false);

		// now it's OK for the network thread to exit
		m_work.reset();
	}

	bool session_impl::has_connection(peer_connection* p) const
	{
		return m_connections.find(p->self()) != m_connections.end();
	}

	void session_impl::insert_peer(std::shared_ptr<peer_connection> const& c)
	{
		TORRENT_ASSERT(!c->m_in_constructor);

		// removing a peer may not throw an exception, so prepare for this
		// connection to be added to the undead peers now.
		m_undead_peers.reserve(m_undead_peers.size() + m_connections.size() + 1);
		m_connections.insert(c);

		TORRENT_ASSERT_VAL(m_undead_peers.capacity() >= m_connections.size()
			, m_undead_peers.capacity());
	}

	void session_impl::set_port_filter(port_filter const& f)
	{
		m_port_filter = f;
		if (m_settings.get_bool(settings_pack::no_connect_privileged_ports))
			m_port_filter.add_rule(0, 1024, port_filter::blocked);
		// Close connections whose endpoint is filtered
		// by the new ip-filter
		for (auto const& t : m_torrents)
			t->port_filter_updated();
	}

	void session_impl::set_ip_filter(std::shared_ptr<ip_filter> f)
	{
		INVARIANT_CHECK;

		m_ip_filter = std::move(f);

		// Close connections whose endpoint is filtered
		// by the new ip-filter
		for (auto& i : m_torrents)
			i->set_ip_filter(m_ip_filter);
	}

	void session_impl::ban_ip(address addr)
	{
		TORRENT_ASSERT(is_single_thread());
		if (!m_ip_filter) m_ip_filter = std::make_shared<ip_filter>();
		m_ip_filter->add_rule(addr, addr, ip_filter::blocked);
		for (auto& i : m_torrents)
			i->set_ip_filter(m_ip_filter);
	}

	ip_filter const& session_impl::get_ip_filter()
	{
		TORRENT_ASSERT(is_single_thread());
		if (!m_ip_filter) m_ip_filter = std::make_shared<ip_filter>();
		return *m_ip_filter;
	}

	port_filter const& session_impl::get_port_filter() const
	{
		TORRENT_ASSERT(is_single_thread());
		return m_port_filter;
	}

	peer_class_t session_impl::create_peer_class(char const* name)
	{
		TORRENT_ASSERT(is_single_thread());
		return m_classes.new_peer_class(name);
	}

	void session_impl::delete_peer_class(peer_class_t const cid)
	{
		TORRENT_ASSERT(is_single_thread());
		// if you hit this assert, you're deleting a non-existent peer class
		TORRENT_ASSERT_PRECOND(m_classes.at(cid));
		if (m_classes.at(cid) == nullptr) return;
		m_classes.decref(cid);
	}

	peer_class_info session_impl::get_peer_class(peer_class_t const cid) const
	{
		peer_class_info ret{};
		peer_class const* pc = m_classes.at(cid);
		// if you hit this assert, you're passing in an invalid cid
		TORRENT_ASSERT_PRECOND(pc);
		if (pc == nullptr)
		{
#if TORRENT_USE_INVARIANT_CHECKS
			// make it obvious that the return value is undefined
			ret.upload_limit = 0xf0f0f0f;
			ret.download_limit = 0xf0f0f0f;
			ret.label.resize(20);
			url_random(span<char>(ret.label));
			ret.ignore_unchoke_slots = false;
			ret.connection_limit_factor = 0xf0f0f0f;
			ret.upload_priority = 0xf0f0f0f;
			ret.download_priority = 0xf0f0f0f;
#endif
			return ret;
		}

		pc->get_info(&ret);
		return ret;
	}

namespace {

	std::uint16_t make_announce_port(std::uint16_t const p)
	{ return p == 0 ? 1 : p; }
}

	void session_impl::queue_tracker_request(tracker_request req
		, std::weak_ptr<request_callback> c)
	{
		req.listen_port = 0;
#if TORRENT_USE_I2P
		if (!m_settings.get_str(settings_pack::i2p_hostname).empty())
		{
			req.i2pconn = &m_i2p_conn;
		}
#endif

#if TORRENT_USE_SSL
#ifdef TORRENT_SSL_PEERS
		bool const use_ssl = req.ssl_ctx != nullptr && req.ssl_ctx != &m_ssl_ctx;
		if (!use_ssl)
#endif
			req.ssl_ctx = &m_ssl_ctx;
#endif

		TORRENT_ASSERT(req.outgoing_socket);
		auto* ls = req.outgoing_socket.get();

		req.listen_port =
#if TORRENT_USE_I2P
			(req.kind == tracker_request::i2p) ? 1 :
#endif
#ifdef TORRENT_SSL_PEERS
			// SSL torrents use the SSL listen port
			use_ssl ? make_announce_port(ssl_listen_port(ls)) :
#endif
			make_announce_port(listen_port(ls));
		m_tracker_manager.queue_request(get_context(), std::move(req), m_settings, c);
	}

	void session_impl::set_peer_class(peer_class_t const cid, peer_class_info const& pci)
	{
		peer_class* pc = m_classes.at(cid);
		// if you hit this assert, you're passing in an invalid cid
		TORRENT_ASSERT_PRECOND(pc);
		if (pc == nullptr) return;

		pc->set_info(&pci);
	}

	void session_impl::set_peer_class_filter(ip_filter const& f)
	{
		INVARIANT_CHECK;
		m_peer_class_filter = f;
	}

	ip_filter const& session_impl::get_peer_class_filter() const
	{
		return m_peer_class_filter;
	}

	void session_impl::set_peer_class_type_filter(peer_class_type_filter f)
	{
		m_peer_class_type_filter = f;
	}

	peer_class_type_filter session_impl::get_peer_class_type_filter()
	{
		return m_peer_class_type_filter;
	}

	void session_impl::set_peer_classes(peer_class_set* s, address const& a, socket_type_t const st)
	{
		std::uint32_t peer_class_mask = m_peer_class_filter.access(a);

		using sock_t = peer_class_type_filter::socket_type_t;
		// assign peer class based on socket type
		static aux::array<sock_t, 10, socket_type_t> const mapping{{{
			sock_t::tcp_socket
			, sock_t::tcp_socket
			, sock_t::tcp_socket
			, sock_t::utp_socket
			, sock_t::i2p_socket
			, sock_t::rtc_socket
			, sock_t::ssl_tcp_socket
			, sock_t::ssl_tcp_socket
			, sock_t::ssl_tcp_socket
			, sock_t::ssl_utp_socket
		}}};
		sock_t const socket_type = mapping[st];
		// filter peer classes based on type
		peer_class_mask = m_peer_class_type_filter.apply(socket_type, peer_class_mask);

		for (peer_class_t i{0}; peer_class_mask; peer_class_mask >>= 1, ++i)
		{
			if ((peer_class_mask & 1) == 0) continue;

			// if you hit this assert, your peer class filter contains
			// a bitmask referencing a non-existent peer class
			TORRENT_ASSERT_PRECOND(m_classes.at(i));

			if (m_classes.at(i) == nullptr) continue;
			s->add_class(m_classes, i);
		}
	}

	bool session_impl::ignore_unchoke_slots_set(peer_class_set const& set) const
	{
		int num = set.num_classes();
		for (int i = 0; i < num; ++i)
		{
			peer_class const* pc = m_classes.at(set.class_at(i));
			if (pc == nullptr) continue;
			if (pc->ignore_unchoke_slots) return true;
		}
		return false;
	}

	bandwidth_manager* session_impl::get_bandwidth_manager(int channel)
	{
		return (channel == peer_connection::download_channel)
			? &m_download_rate : &m_upload_rate;
	}

	void session_impl::deferred_submit_jobs()
	{
		if (m_deferred_submit_disk_jobs) return;
		m_deferred_submit_disk_jobs = true;
		post(m_io_context, make_handler(
			[this] { wrap(&session_impl::submit_disk_jobs); }
			, m_submit_jobs_handler_storage, *this));
	}

	void session_impl::submit_disk_jobs()
	{
		TORRENT_ASSERT(m_deferred_submit_disk_jobs);
		m_deferred_submit_disk_jobs = false;
		m_disk_thread->submit_jobs();
	}

	// copies pointers to bandwidth channels from the peer classes
	// into the array. Only bandwidth channels with a bandwidth limit
	// is considered pertinent and copied
	// returns the number of pointers copied
	// channel is upload_channel or download_channel
	int session_impl::copy_pertinent_channels(peer_class_set const& set
		, int channel, bandwidth_channel** dst, int const max)
	{
		int num_channels = set.num_classes();
		int num_copied = 0;
		for (int i = 0; i < num_channels; ++i)
		{
			peer_class* pc = m_classes.at(set.class_at(i));
			TORRENT_ASSERT(pc);
			if (pc == nullptr) continue;
			bandwidth_channel* chan = &pc->channel[channel];
			// no need to include channels that don't have any bandwidth limits
			if (chan->throttle() == 0) continue;
			dst[num_copied] = chan;
			++num_copied;
			if (num_copied == max) break;
		}
		return num_copied;
	}

	bool session_impl::use_quota_overhead(bandwidth_channel* ch, int amount)
	{
		ch->use_quota(amount);
		return (ch->throttle() > 0 && ch->throttle() < amount);
	}

	std::uint8_t session_impl::use_quota_overhead(peer_class_set& set, int const amount_down, int const amount_up)
	{
		std::uint8_t ret = 0;
		int const num = set.num_classes();
		for (int i = 0; i < num; ++i)
		{
			peer_class* p = m_classes.at(set.class_at(i));
			if (p == nullptr) continue;

			bandwidth_channel* ch = &p->channel[peer_connection::download_channel];
			if (use_quota_overhead(ch, amount_down))
				ret |= 1u << peer_connection::download_channel;
			ch = &p->channel[peer_connection::upload_channel];
			if (use_quota_overhead(ch, amount_up))
				ret |= 1u << peer_connection::upload_channel;
		}
		return ret;
	}

	// session_impl is responsible for deleting 'pack'
	void session_impl::apply_settings_pack(std::shared_ptr<settings_pack> pack)
	{
		INVARIANT_CHECK;
		apply_settings_pack_impl(*pack);
	}

	settings_pack session_impl::get_settings() const
	{
		settings_pack ret;
		// TODO: it would be nice to reserve() these vectors up front
		for (int i = settings_pack::string_type_base;
			i < settings_pack::max_string_setting_internal; ++i)
		{
			ret.set_str(i, m_settings.get_str(i));
		}
		for (int i = settings_pack::int_type_base;
			i < settings_pack::max_int_setting_internal; ++i)
		{
			ret.set_int(i, m_settings.get_int(i));
		}
		for (int i = settings_pack::bool_type_base;
			i < settings_pack::max_bool_setting_internal; ++i)
		{
			ret.set_bool(i, m_settings.get_bool(i));
		}
		return ret;
	}

namespace {
	template <typename Pack>
	int get_setting_impl(Pack const& p, int name, int*)
	{ return p.get_int(name); }

	template <typename Pack>
	bool get_setting_impl(Pack const& p, int name, bool*)
	{ return p.get_bool(name); }

	template <typename Pack>
	std::string get_setting_impl(Pack const& p, int name, std::string*)
	{ return p.get_str(name); }

	template <typename Type, typename Pack>
	Type get_setting(Pack const& p, int name)
	{
		return get_setting_impl(p, name, static_cast<Type*>(nullptr));
	}

	template <typename Type>
	bool setting_changed(settings_pack const& pack, aux::session_settings const& sett, int name)
	{
		return pack.has_val(name)
			&& get_setting<Type>(pack, name) != get_setting<Type>(sett, name);
	}
}

	void session_impl::apply_settings_pack_impl(settings_pack const& pack)
	{
		bool const reopen_listen_port
			= setting_changed<std::string>(pack, m_settings, settings_pack::listen_interfaces)
			|| setting_changed<int>(pack, m_settings, settings_pack::proxy_type)
			|| setting_changed<bool>(pack, m_settings, settings_pack::proxy_peer_connections)
#if TORRENT_ABI_VERSION == 1
			|| setting_changed<int>(pack, m_settings, settings_pack::ssl_listen)
#endif
			;

		bool const update_want_peers
			= setting_changed<bool>(pack, m_settings, settings_pack::seeding_outgoing_connections)
			|| setting_changed<bool>(pack, m_settings, settings_pack::enable_outgoing_tcp)
			|| setting_changed<bool>(pack, m_settings, settings_pack::enable_outgoing_utp)
		;

#ifndef TORRENT_DISABLE_LOGGING
		session_log("applying settings pack, reopen_listen_port=%s"
			, reopen_listen_port ? "true" : "false");
#endif

		apply_pack(&pack, m_settings, this);
		m_disk_thread->settings_updated();

		if (!reopen_listen_port)
		{
			// no need to call this if reopen_listen_port is true
			// since the apply_pack will do it
			update_listen_interfaces();
		}
		else
		{
			reopen_listen_sockets();
		}

		if (update_want_peers)
		{
			for (auto const& t : m_torrents)
				t->update_want_peers();
		}
	}

	std::shared_ptr<listen_socket_t> session_impl::setup_listener(
		listen_endpoint_t const& lep, error_code& ec)
	{
		int retries = m_settings.get_int(settings_pack::max_retry_port_bind);
		tcp::endpoint bind_ep(lep.addr, std::uint16_t(lep.port));

#ifndef TORRENT_DISABLE_LOGGING
		if (should_log())
		{
			session_log("attempting to open listen socket to: %s on device: %s %s%s%s%s%s"
				, print_endpoint(bind_ep).c_str(), lep.device.c_str()
				, (lep.ssl == transport::ssl) ? "ssl " : ""
				, (lep.flags & listen_socket_t::local_network) ? "local-network " : ""
				, (lep.flags & listen_socket_t::accept_incoming) ? "accept-incoming " : "no-incoming "
				, (lep.flags & listen_socket_t::was_expanded) ? "expanded-ip " : ""
				, (lep.flags & listen_socket_t::proxy) ? "proxy " : "");
		}
#endif

		auto ret = std::make_shared<listen_socket_t>();
		ret->ssl = lep.ssl;
		ret->original_port = bind_ep.port();
		ret->flags = lep.flags;
		ret->netmask = lep.netmask;
		operation_t last_op = operation_t::unknown;
		socket_type_t const sock_type
			= (lep.ssl == transport::ssl)
			? socket_type_t::tcp_ssl
			: socket_type_t::tcp;

		// if we're in force-proxy mode, don't open TCP listen sockets. We cannot
		// accept connections on our local machine in this case.
		// TODO: 3 the logic in this if-block should be factored out into a
		// separate function. At least most of it
		if (ret->flags & listen_socket_t::accept_incoming)
		{
			ret->sock = std::make_shared<tcp::acceptor>(m_io_context);
			ret->sock->open(bind_ep.protocol(), ec);
			last_op = operation_t::sock_open;
			if (ec)
			{
#ifndef TORRENT_DISABLE_LOGGING
				if (should_log())
				{
					session_log("failed to open socket: %s"
						, ec.message().c_str());
				}
#endif

				if (m_alerts.should_post<listen_failed_alert>())
					m_alerts.emplace_alert<listen_failed_alert>(lep.device, bind_ep, last_op
						, ec, sock_type);
				return ret;
			}

#ifdef TORRENT_WINDOWS
			{
				// this is best-effort. ignore errors
				error_code err;
				ret->sock->set_option(exclusive_address_use(true), err);
#ifndef TORRENT_DISABLE_LOGGING
				if (err && should_log())
				{
					session_log("failed enable exclusive address use on listen socket: %s"
						, err.message().c_str());
				}
#endif // TORRENT_DISABLE_LOGGING
			}
#else

			{
				// this is best-effort. ignore errors
				error_code err;
				ret->sock->set_option(tcp::acceptor::reuse_address(true), err);
#ifndef TORRENT_DISABLE_LOGGING
				if (err && should_log())
				{
					session_log("failed enable reuse-address on listen socket: %s"
						, err.message().c_str());
				}
#endif // TORRENT_DISABLE_LOGGING
			}
#endif // TORRENT_WINDOWS

			if (is_v6(bind_ep))
			{
				error_code err; // ignore errors here
				ret->sock->set_option(boost::asio::ip::v6_only(true), err);
#ifndef TORRENT_DISABLE_LOGGING
				if (err && should_log())
				{
					session_log("failed enable v6 only on listen socket: %s"
						, err.message().c_str());
				}
#endif // LOGGING

#ifdef TORRENT_WINDOWS
				// enable Teredo on windows
				ret->sock->set_option(v6_protection_level(PROTECTION_LEVEL_UNRESTRICTED), err);
#ifndef TORRENT_DISABLE_LOGGING
				if (err && should_log())
				{
					session_log("failed enable IPv6 unrestricted protection level on "
						"listen socket: %s", err.message().c_str());
				}
#endif // TORRENT_DISABLE_LOGGING
#endif // TORRENT_WINDOWS
			}

			if (!lep.device.empty())
			{
				// we have an actual device we're interested in listening on, if we
				// have SO_BINDTODEVICE functionality, use it now.
#if TORRENT_HAS_BINDTODEVICE
				bind_device(*ret->sock, lep.device.c_str(), ec);
#ifndef TORRENT_DISABLE_LOGGING
				if (ec && should_log())
				{
					session_log("bind to device failed (device: %s): %s"
						, lep.device.c_str(), ec.message().c_str());
				}
#endif // TORRENT_DISABLE_LOGGING
				ec.clear();
#endif // TORRENT_HAS_BINDTODEVICE
			}

			ret->sock->bind(bind_ep, ec);
			last_op = operation_t::sock_bind;

			while (ec == error_code(error::address_in_use) && retries > 0)
			{
				TORRENT_ASSERT_VAL(ec, ec);
#ifndef TORRENT_DISABLE_LOGGING
				if (should_log())
				{
					session_log("failed to bind listen socket to: %s on device: %s :"
						" [%s] (%d) %s (retries: %d)"
						, print_endpoint(bind_ep).c_str()
						, lep.device.c_str()
						, ec.category().name(), ec.value(), ec.message().c_str()
						, retries);
				}
#endif
				ec.clear();
				--retries;
				bind_ep.port(bind_ep.port() + 1);
				ret->sock->bind(bind_ep, ec);
			}

			if (ec == error_code(error::address_in_use)
				&& m_settings.get_bool(settings_pack::listen_system_port_fallback)
				&& bind_ep.port() != 0)
			{
				// instead of giving up, try let the OS pick a port
				bind_ep.port(0);
				ec.clear();
				ret->sock->bind(bind_ep, ec);
				last_op = operation_t::sock_bind;
			}

			if (ec)
			{
				// not even that worked, give up

#ifndef TORRENT_DISABLE_LOGGING
				if (should_log())
				{
					session_log("failed to bind listen socket to: %s on device: %s :"
						" [%s] (%d) %s (giving up)"
						, print_endpoint(bind_ep).c_str()
						, lep.device.c_str()
						, ec.category().name(), ec.value(), ec.message().c_str());
				}
#endif
				if (m_alerts.should_post<listen_failed_alert>())
				{
					m_alerts.emplace_alert<listen_failed_alert>(lep.device, bind_ep
						, last_op, ec, sock_type);
				}
				ret->sock.reset();
				return ret;
			}
			ret->local_endpoint = ret->sock->local_endpoint(ec);
			last_op = operation_t::getname;
			if (ec)
			{
#ifndef TORRENT_DISABLE_LOGGING
				if (should_log())
				{
					session_log("get_sockname failed on listen socket: %s"
						, ec.message().c_str());
				}
#endif
				if (m_alerts.should_post<listen_failed_alert>())
				{
					m_alerts.emplace_alert<listen_failed_alert>(lep.device, bind_ep
						, last_op, ec, sock_type);
				}
				return ret;
			}

			TORRENT_ASSERT(ret->local_endpoint.port() == bind_ep.port()
				|| bind_ep.port() == 0);

			if (bind_ep.port() == 0) bind_ep = ret->local_endpoint;

			ret->sock->listen(m_settings.get_int(settings_pack::listen_queue_size), ec);
			last_op = operation_t::sock_listen;

			if (ec)
			{
#ifndef TORRENT_DISABLE_LOGGING
				if (should_log())
				{
					session_log("cannot listen on interface \"%s\": %s"
						, lep.device.c_str(), ec.message().c_str());
				}
#endif
				if (m_alerts.should_post<listen_failed_alert>())
				{
					m_alerts.emplace_alert<listen_failed_alert>(lep.device, bind_ep
						, last_op, ec, sock_type);
				}
				return ret;
			}
		} // accept incoming

		socket_type_t const udp_sock_type
			= (lep.ssl == transport::ssl)
			? socket_type_t::utp_ssl
			: socket_type_t::utp;
		udp::endpoint udp_bind_ep(bind_ep.address(), bind_ep.port());

		ret->udp_sock = std::make_shared<session_udp_socket>(m_io_context, ret);
		ret->udp_sock->sock.open(udp_bind_ep.protocol(), ec);
		if (ec)
		{
#ifndef TORRENT_DISABLE_LOGGING
			if (should_log())
			{
				session_log("failed to open UDP socket: %s: %s"
					, lep.device.c_str(), ec.message().c_str());
			}
#endif

			last_op = operation_t::sock_open;
			if (m_alerts.should_post<listen_failed_alert>())
				m_alerts.emplace_alert<listen_failed_alert>(lep.device
					, bind_ep, last_op, ec, udp_sock_type);

			return ret;
		}

#if TORRENT_HAS_BINDTODEVICE
		if (!lep.device.empty())
		{
			bind_device(ret->udp_sock->sock, lep.device.c_str(), ec);
#ifndef TORRENT_DISABLE_LOGGING
			if (ec && should_log())
			{
				session_log("bind to device failed (device: %s): %s"
					, lep.device.c_str(), ec.message().c_str());
			}
#endif // TORRENT_DISABLE_LOGGING
			ec.clear();
		}
#endif
		ret->udp_sock->sock.bind(udp_bind_ep, ec);

		while (ec == error_code(error::address_in_use) && retries > 0)
		{
			TORRENT_ASSERT_VAL(ec, ec);
#ifndef TORRENT_DISABLE_LOGGING
			if (should_log())
			{
				session_log("failed to bind udp socket to: %s on device: %s :"
					" [%s] (%d) %s (retries: %d)"
					, print_endpoint(bind_ep).c_str()
					, lep.device.c_str()
					, ec.category().name(), ec.value(), ec.message().c_str()
					, retries);
			}
#endif
			ec.clear();
			--retries;
			udp_bind_ep.port(udp_bind_ep.port() + 1);
			ret->udp_sock->sock.bind(udp_bind_ep, ec);
		}

		if (ec == error_code(error::address_in_use)
			&& m_settings.get_bool(settings_pack::listen_system_port_fallback)
			&& udp_bind_ep.port() != 0)
		{
			// instead of giving up, try let the OS pick a port
			udp_bind_ep.port(0);
			ec.clear();
			ret->udp_sock->sock.bind(udp_bind_ep, ec);
		}

		last_op = operation_t::sock_bind;
		if (ec)
		{
#ifndef TORRENT_DISABLE_LOGGING
			if (should_log())
			{
				session_log("failed to bind UDP socket: %s: %s"
					, lep.device.c_str(), ec.message().c_str());
			}
#endif

			if (m_alerts.should_post<listen_failed_alert>())
				m_alerts.emplace_alert<listen_failed_alert>(lep.device
					, bind_ep, last_op, ec, udp_sock_type);

			return ret;
		}

		// if we did not open a TCP listen socket, ret->local_endpoint was never
		// initialized, so do that now, based on the UDP socket
		if (!(ret->flags & listen_socket_t::accept_incoming))
		{
			auto const udp_ep = ret->udp_sock->local_endpoint();
			ret->local_endpoint = tcp::endpoint(udp_ep.address(), udp_ep.port());
		}

		ret->device = lep.device;

		error_code err;
		set_socket_buffer_size(ret->udp_sock->sock, m_settings, err);
		if (err)
		{
			if (m_alerts.should_post<udp_error_alert>())
				m_alerts.emplace_alert<udp_error_alert>(ret->udp_sock->sock.local_endpoint(ec)
					, operation_t::alloc_recvbuf, err);
		}

		// this call is necessary here because, unless the settings actually
		// change after the session is up and listening, at no other point
		// set_proxy_settings is called with the correct proxy configuration,
		// internally, this method handle the SOCKS5's connection logic
		ret->udp_sock->sock.set_proxy_settings(proxy(), m_alerts);

		ADD_OUTSTANDING_ASYNC("session_impl::on_udp_packet");
		ret->udp_sock->sock.async_read(aux::make_handler([this, ret](error_code const& e)
			{ this->on_udp_packet(ret->udp_sock, ret, ret->ssl, e); }
			, ret->udp_handler_storage, *this));

#ifndef TORRENT_DISABLE_LOGGING
		if (should_log())
		{
			session_log(" listening on: %s TCP port: %d UDP port: %d"
				, bind_ep.address().to_string().c_str()
				, ret->tcp_external_port(), ret->udp_external_port());
		}
#endif
		return ret;
	}

	void session_impl::on_exception(std::exception const& e)
	{
		TORRENT_UNUSED(e);
#ifndef TORRENT_DISABLE_LOGGING
		session_log("FATAL SESSION ERROR [%s]", e.what());
#endif
		this->abort();
	}

	void session_impl::on_error(error_code const& ec)
	{
		TORRENT_UNUSED(ec);
#ifndef TORRENT_DISABLE_LOGGING
		session_log("FATAL SESSION ERROR (%s : %d) [%s]"
			, ec.category().name(), ec.value(), ec.message().c_str());
#endif
		this->abort();
	}

	void session_impl::on_ip_change(error_code const& ec)
	{
#ifndef TORRENT_DISABLE_LOGGING
		if (!ec)
			session_log("received ip change from internal ip_notifier");
		else
			session_log("received error on_ip_change: %d, %s", ec.value(), ec.message().c_str());
#endif
		if (ec || m_abort || !m_ip_notifier) return;
		m_ip_notifier->async_wait([this] (error_code const& e)
			{ wrap(&session_impl::on_ip_change, e); });
		reopen_network_sockets({});
	}

	// TODO: could this function be merged with expand_unspecified_addresses?
	// right now both listen_endpoint_t and listen_interface_t are almost
	// identical, maybe the latter could be removed too
	void interface_to_endpoints(listen_interface_t const& iface
		, listen_socket_flags_t flags
		, span<ip_interface const> const ifs
		, std::vector<listen_endpoint_t>& eps)
	{
		flags |= iface.local ? listen_socket_t::local_network : listen_socket_flags_t{};
		transport const ssl = iface.ssl ? transport::ssl : transport::plaintext;

		// First, check to see if it's an IP address
		error_code err;
		address const adr = make_address(iface.device.c_str(), err);
		if (!err)
		{
			eps.emplace_back(adr, iface.port, std::string{}, ssl, flags);
		}
		else
		{
			flags |= listen_socket_t::was_expanded;

			// this is the case where device names a network device. We need to
			// enumerate all IPs associated with this device
			for (auto const& ipface : ifs)
			{
				// we're looking for a specific interface, and its address
				// (which must be of the same family as the address we're
				// connecting to)
				if (iface.device != ipface.name) continue;

				bool const local = iface.local
					|| ipface.interface_address.is_loopback()
					|| is_link_local(ipface.interface_address);

				eps.emplace_back(ipface.interface_address, iface.port, iface.device
					, ssl, flags | (local ? listen_socket_t::local_network : listen_socket_flags_t{}));
			}
		}
	}

	void session_impl::reopen_listen_sockets(bool const map_ports)
	{
#ifndef TORRENT_DISABLE_LOGGING
		session_log("reopen listen sockets");
#endif

		TORRENT_ASSERT(is_single_thread());

		TORRENT_ASSERT(!m_abort);

		error_code ec;

		if (m_abort) return;

		// first build a list of endpoints we should be listening on
		// we need to remove any unneeded sockets first to avoid the possibility
		// of a new socket failing to bind due to a conflict with a stale socket
		std::vector<listen_endpoint_t> eps;

		// if we don't proxy peer connections, don't apply the special logic for
		// proxies
		if (m_settings.get_int(settings_pack::proxy_type) != settings_pack::none
			&& m_settings.get_bool(settings_pack::proxy_peer_connections))
		{
			// we will be able to accept incoming connections over UDP. so use
			// one of the ports the user specified to use a consistent port
			// across sessions. If the user did not specify any ports, pick one
			// at random
			int const port = m_listen_interfaces.empty()
				? int(random(63000) + 2000)
				: m_listen_interfaces.front().port;
			listen_endpoint_t ep(address_v4::any(), port, {}
				, transport::plaintext, listen_socket_t::proxy);
			eps.emplace_back(ep);
		}
		else
		{
			std::vector<ip_interface> const ifs = enum_net_interfaces(m_io_context, ec);
			if (ec && m_alerts.should_post<listen_failed_alert>())
			{
				m_alerts.emplace_alert<listen_failed_alert>(""
					, operation_t::enum_if, ec, socket_type_t::tcp);
			}
			auto const routes = enum_routes(m_io_context, ec);
			if (ec && m_alerts.should_post<listen_failed_alert>())
			{
				m_alerts.emplace_alert<listen_failed_alert>(""
					, operation_t::enum_route, ec, socket_type_t::tcp);
			}

			// expand device names and populate eps
			for (auto const& iface : m_listen_interfaces)
			{
#if !TORRENT_USE_SSL
				if (iface.ssl)
				{
#ifndef TORRENT_DISABLE_LOGGING
					session_log("attempted to listen ssl with no library support on device: \"%s\""
						, iface.device.c_str());
#endif
					if (m_alerts.should_post<listen_failed_alert>())
					{
						m_alerts.emplace_alert<listen_failed_alert>(iface.device
							, operation_t::sock_open
							, boost::asio::error::operation_not_supported
							, socket_type_t::tcp_ssl);
					}
					continue;
				}
#endif

				// now we have a device to bind to. This device may actually just be an
				// IP address or a device name. In case it's a device name, we want to
				// (potentially) end up binding a socket for each IP address associated
				// with that device.
				interface_to_endpoints(iface, listen_socket_t::accept_incoming, ifs, eps);
			}

			if (eps.empty())
			{
#ifndef TORRENT_DISABLE_LOGGING
				session_log("no listen sockets");
#endif
			}

			expand_unspecified_address(ifs, routes, eps);
			expand_devices(ifs, eps);
		}

		auto remove_iter = partition_listen_sockets(eps, m_listen_sockets);

		while (remove_iter != m_listen_sockets.end())
		{
#ifndef TORRENT_DISABLE_DHT
			if (m_dht)
				m_dht->delete_socket(*remove_iter);
#endif

#ifndef TORRENT_DISABLE_LOGGING
			if (should_log())
			{
				session_log("closing listen socket for %s on device \"%s\""
					, print_endpoint((*remove_iter)->local_endpoint).c_str()
					, (*remove_iter)->device.c_str());
			}
#endif
			if ((*remove_iter)->sock) (*remove_iter)->sock->close(ec);
			if ((*remove_iter)->udp_sock) (*remove_iter)->udp_sock->sock.close();
			if ((*remove_iter)->natpmp_mapper) (*remove_iter)->natpmp_mapper->close();
			if ((*remove_iter)->upnp_mapper) (*remove_iter)->upnp_mapper->close();
			if ((*remove_iter)->lsd) (*remove_iter)->lsd->close();
			remove_iter = m_listen_sockets.erase(remove_iter);
		}

		// all sockets in there stayed the same. Only sockets after this point are
		// new and should post alerts
		int const existing_sockets = int(m_listen_sockets.size());

		m_stats_counters.set_value(counters::has_incoming_connections
			, std::any_of(m_listen_sockets.begin(), m_listen_sockets.end()
				, [](std::shared_ptr<listen_socket_t> const& l)
				{ return l->incoming_connection; }));

		// open new sockets on any endpoints that didn't match with
		// an existing socket
		for (auto const& ep : eps)
#ifndef BOOST_NO_EXCEPTIONS
			try
#endif
		{
			std::shared_ptr<listen_socket_t> s = setup_listener(ep, ec);

			if (!ec && (s->sock || s->udp_sock))
			{
				m_listen_sockets.emplace_back(s);

#ifndef TORRENT_DISABLE_DHT
				if (m_dht
					&& s->ssl != transport::ssl
					&& !(s->flags & listen_socket_t::local_network))
				{
					m_dht->new_socket(m_listen_sockets.back());
				}
#endif

				TORRENT_ASSERT(bool(s->flags & listen_socket_t::accept_incoming) == bool(s->sock));
				if (s->sock) async_accept(s->sock, s->ssl);
			}
		}
#ifndef BOOST_NO_EXCEPTIONS
		catch (std::exception const& e)
		{
			TORRENT_UNUSED(e);
#ifndef TORRENT_DISABLE_LOGGING
			if (should_log())
			{
				session_log("setup_listener(%s) device: %s failed: %s"
					, print_endpoint(ep.addr, ep.port).c_str()
					, ep.device.c_str()
					, e.what());
			}
#endif // TORRENT_DISABLE_LOGGING
		}
#endif // BOOST_NO_EXCEPTIONS

		if (m_listen_sockets.empty())
		{
#ifndef TORRENT_DISABLE_LOGGING
			session_log("giving up on binding listen sockets");
#endif
			return;
		}

		auto const new_sockets = span<std::shared_ptr<listen_socket_t>>(
			m_listen_sockets).subspan(existing_sockets);

		// now, send out listen_succeeded_alert for the listen sockets we are
		// listening on
		if (m_alerts.should_post<listen_succeeded_alert>())
		{
			for (auto const& l : new_sockets)
			{
				error_code err;
				if (l->sock)
				{
					tcp::endpoint const tcp_ep = l->sock->local_endpoint(err);
					if (!err)
					{
						socket_type_t const socket_type
							= l->ssl == transport::ssl
							? socket_type_t::tcp_ssl
							: socket_type_t::tcp;

						m_alerts.emplace_alert<listen_succeeded_alert>(
							tcp_ep, socket_type);
					}
				}

				if (l->udp_sock)
				{
					udp::endpoint const udp_ep = l->udp_sock->sock.local_endpoint(err);
					if (!err && l->udp_sock->sock.is_open())
					{
						socket_type_t const socket_type
							= l->ssl == transport::ssl
							? socket_type_t::utp_ssl
							: socket_type_t::utp;

						m_alerts.emplace_alert<listen_succeeded_alert>(
							udp_ep, socket_type);
					}
				}
			}
		}

		if (m_settings.get_int(settings_pack::peer_tos) != 0)
		{
			update_peer_tos();
		}

		ec.clear();

		if (m_settings.get_bool(settings_pack::enable_natpmp))
		{
			for (auto const& s : new_sockets)
				start_natpmp(s);
		}

		if (m_settings.get_bool(settings_pack::enable_upnp))
		{
			for (auto const& s : new_sockets)
				start_upnp(s);
		}

		if (map_ports)
		{
			for (auto const& s : m_listen_sockets)
				remap_ports(remap_natpmp_and_upnp, *s);
		}
		else
		{
			// new sockets need to map ports even if the caller did not request
			// re-mapping
			for (auto const& s : new_sockets)
				remap_ports(remap_natpmp_and_upnp, *s);
		}

		update_lsd();

#if TORRENT_USE_I2P
		open_new_incoming_i2p_connection();
#endif

		// trackers that were not reachable, may have become reachable now.
		// so clear the "disabled" flags to let them be tried one more time
		// TODO: it would probably be better to do this by having a
		// listen-socket "version" number that gets bumped. And instead of
		// setting a bool to disable a tracker, we set the version number that
		// it was disabled at. This change would affect the ABI in 1.2, so
		// should be done in 2.0 or later
		for (auto& t : m_torrents)
			t->enable_all_trackers();
	}

	void session_impl::reopen_network_sockets(reopen_network_flags_t const options)
	{
		reopen_listen_sockets(bool(options & session_handle::reopen_map_ports));
	}

	namespace {
		template <typename MapProtocol, typename ProtoType, typename EndpointType>
		void map_port(MapProtocol& m, ProtoType protocol, EndpointType const& ep
			, port_mapping_t& map_handle)
		{
			if (map_handle != port_mapping_t{-1}) m.delete_mapping(map_handle);
			map_handle = port_mapping_t{-1};

			address const addr = ep.address();
			// with IPv4 the interface might be behind NAT so we can't skip them
			// based on the scope of the local address
			if (addr.is_v6() && is_local(addr))
				return;

			// only update this mapping if we actually have a socket listening
			if (ep != EndpointType())
				map_handle = m.add_mapping(protocol, ep.port(), ep);
		}
	}

	void session_impl::remap_ports(remap_port_mask_t const mask
		, listen_socket_t& s)
	{
		tcp::endpoint const tcp_ep = s.sock ? s.sock->local_endpoint() : tcp::endpoint();
		udp::endpoint const udp_ep = s.udp_sock ? s.udp_sock->sock.local_endpoint() : udp::endpoint();

		if ((mask & remap_natpmp) && s.natpmp_mapper)
		{
			map_port(*s.natpmp_mapper, portmap_protocol::tcp, tcp_ep
				, s.tcp_port_mapping[portmap_transport::natpmp].mapping);
			map_port(*s.natpmp_mapper, portmap_protocol::udp, make_tcp(udp_ep)
				, s.udp_port_mapping[portmap_transport::natpmp].mapping);
		}
		if ((mask & remap_upnp) && s.upnp_mapper)
		{
			map_port(*s.upnp_mapper, portmap_protocol::tcp, tcp_ep
				, s.tcp_port_mapping[portmap_transport::upnp].mapping);
			map_port(*s.upnp_mapper, portmap_protocol::udp, make_tcp(udp_ep)
				, s.udp_port_mapping[portmap_transport::upnp].mapping);
		}
	}

	void session_impl::update_i2p_bridge()
	{
		// we need this socket to be open before we
		// can make name lookups for trackers for instance.
		// pause the session now and resume it once we've
		// established the i2p SAM connection
#if TORRENT_USE_I2P
		if (m_settings.get_str(settings_pack::i2p_hostname).empty())
		{
			error_code ec;
			m_i2p_conn.close(ec);
			return;
		}
		m_i2p_conn.open(m_settings.get_str(settings_pack::i2p_hostname)
			, m_settings.get_int(settings_pack::i2p_port)
			, std::bind(&session_impl::on_i2p_open, this, _1));
#endif
	}

#ifndef TORRENT_DISABLE_DHT
	int session_impl::external_udp_port(address const& local_address) const
	{
		auto ls = std::find_if(m_listen_sockets.begin(), m_listen_sockets.end()
			, [&](std::shared_ptr<listen_socket_t> const& e)
		{
			return e->local_endpoint.address() == local_address;
		});

		if (ls != m_listen_sockets.end())
			return (*ls)->udp_external_port();
		else
			return -1;
	}
#endif

#if TORRENT_USE_I2P

	proxy_settings session_impl::i2p_proxy() const
	{
		proxy_settings ret;

		ret.hostname = m_settings.get_str(settings_pack::i2p_hostname);
		ret.type = settings_pack::i2p_proxy;
		ret.port = std::uint16_t(m_settings.get_int(settings_pack::i2p_port));
		return ret;
	}

	void session_impl::on_i2p_open(error_code const& ec)
	{
		if (ec)
		{
			if (m_alerts.should_post<i2p_alert>())
				m_alerts.emplace_alert<i2p_alert>(ec);

#ifndef TORRENT_DISABLE_LOGGING
			if (should_log())
				session_log("i2p open failed (%d) %s", ec.value(), ec.message().c_str());
#endif
		}
		// now that we have our i2p connection established
		// it's OK to start torrents and use this socket to
		// do i2p name lookups

		open_new_incoming_i2p_connection();
	}

	void session_impl::open_new_incoming_i2p_connection()
	{
		if (!m_i2p_conn.is_open()) return;

		if (m_i2p_listen_socket) return;
		m_i2p_listen_socket.emplace(
			instantiate_connection(m_io_context, m_i2p_conn.proxy()
				, nullptr, nullptr, true, false));

		ADD_OUTSTANDING_ASYNC("session_impl::on_i2p_accept");
		auto& s = std::get<i2p_stream>(*m_i2p_listen_socket);
		s.set_command(i2p_stream::cmd_accept);
		s.set_session_id(m_i2p_conn.session_id());

		s.async_connect(tcp::endpoint()
			, std::bind(&session_impl::on_i2p_accept, this, _1));
	}

	void session_impl::on_i2p_accept(error_code const& e)
	{
		COMPLETE_ASYNC("session_impl::on_i2p_accept");
		if (e == boost::asio::error::operation_aborted) return;
		if (e)
		{
			if (m_alerts.should_post<listen_failed_alert>())
			{
				m_alerts.emplace_alert<listen_failed_alert>("i2p"
					, operation_t::sock_accept
					, e, socket_type_t::i2p);
			}
#ifndef TORRENT_DISABLE_LOGGING
			if (should_log())
				session_log("i2p SAM connection failure: %s", e.message().c_str());
#endif
			return;
		}
		open_new_incoming_i2p_connection();
		incoming_connection(std::move(*m_i2p_listen_socket));
		m_i2p_listen_socket.reset();
	}
#endif

	void session_impl::send_udp_packet_hostname(std::weak_ptr<utp_socket_interface> sock
		, char const* hostname
		, int const port
		, span<char const> p
		, error_code& ec
		, udp_send_flags_t const flags)
	{
		auto si = sock.lock();
		if (!si)
		{
			ec = boost::asio::error::bad_descriptor;
			return;
		}

		auto s = std::static_pointer_cast<aux::listen_socket_t>(si)->udp_sock;

		s->sock.send_hostname(hostname, port, p, ec, flags);

		if ((ec == error::would_block || ec == error::try_again)
			&& !s->write_blocked)
		{
			s->write_blocked = true;
			ADD_OUTSTANDING_ASYNC("session_impl::on_udp_writeable");
			s->sock.async_write(std::bind(&session_impl::on_udp_writeable
				, this, s, _1));
		}
	}

	void session_impl::send_udp_packet(std::weak_ptr<utp_socket_interface> sock
		, udp::endpoint const& ep
		, span<char const> p
		, error_code& ec
		, udp_send_flags_t const flags)
	{
		auto si = sock.lock();
		if (!si)
		{
			ec = boost::asio::error::bad_descriptor;
			return;
		}

		auto s = std::static_pointer_cast<aux::listen_socket_t>(si)->udp_sock;

		TORRENT_ASSERT(s->sock.is_closed() || s->sock.local_endpoint().protocol() == ep.protocol());

		s->sock.send(ep, p, ec, flags);

		if ((ec == error::would_block || ec == error::try_again) && !s->write_blocked)
		{
			s->write_blocked = true;
			ADD_OUTSTANDING_ASYNC("session_impl::on_udp_writeable");
			s->sock.async_write(std::bind(&session_impl::on_udp_writeable
				, this, s, _1));
		}
	}

	void session_impl::on_udp_writeable(std::weak_ptr<session_udp_socket> sock, error_code const& ec)
	{
		COMPLETE_ASYNC("session_impl::on_udp_writeable");
		if (ec) return;

		auto s = sock.lock();
		if (!s) return;

		s->write_blocked = false;

#ifdef TORRENT_SSL_PEERS
		auto i = std::find_if(
			m_listen_sockets.begin(), m_listen_sockets.end()
			, [&s] (std::shared_ptr<listen_socket_t> const& ls) { return ls->udp_sock == s; });
#endif

		// notify the utp socket manager it can start sending on the socket again
		struct utp_socket_manager& mgr =
#ifdef TORRENT_SSL_PEERS
			(i != m_listen_sockets.end() && (*i)->ssl == transport::ssl) ? m_ssl_utp_socket_manager :
#endif
			m_utp_socket_manager;

		mgr.writable();
	}


	void session_impl::on_udp_packet(std::weak_ptr<session_udp_socket> socket
		, std::weak_ptr<listen_socket_t> ls, transport const ssl, error_code const& ec)
	{
		COMPLETE_ASYNC("session_impl::on_udp_packet");
		if (ec)
		{
			std::shared_ptr<session_udp_socket> s = socket.lock();
			udp::endpoint ep;
			if (s) ep = s->local_endpoint();

			// don't bubble up operation aborted errors to the user
			if (ec != boost::asio::error::operation_aborted
				&& ec != boost::asio::error::bad_descriptor
				&& m_alerts.should_post<udp_error_alert>())
			{
				m_alerts.emplace_alert<udp_error_alert>(ep
					, operation_t::sock_read, ec);
			}

#ifndef TORRENT_DISABLE_LOGGING
			if (should_log())
			{
				session_log("UDP error: %s (%d) %s"
					, print_endpoint(ep).c_str(), ec.value(), ec.message().c_str());
			}
#endif
			return;
		}

		m_stats_counters.inc_stats_counter(counters::on_udp_counter);

		std::shared_ptr<session_udp_socket> s = socket.lock();
		if (!s) return;

		struct utp_socket_manager& mgr =
#ifdef TORRENT_SSL_PEERS
			ssl == transport::ssl ? m_ssl_utp_socket_manager :
#endif
			m_utp_socket_manager;

		auto listen_socket = ls.lock();
		if (listen_socket)
			listen_socket->incoming_connection = true;

		for (;;)
		{
			aux::array<udp_socket::packet, 50> p;
			error_code err;
			int const num_packets = s->sock.read(p, err);

			for (udp_socket::packet& packet : span<udp_socket::packet>(p).first(num_packets))
			{
				if (packet.error)
				{
					// TODO: 3 it would be neat if the utp socket manager would
					// handle ICMP errors too

#ifndef TORRENT_DISABLE_DHT
					if (m_dht)
						m_dht->incoming_error(packet.error, packet.from);
#endif

					m_tracker_manager.incoming_error(packet.error, packet.from);
					continue;
				}

				span<char const> const buf = packet.data;

				// give the uTP socket manager first dibs on the packet. Presumably
				// the majority of packets are uTP packets.
				if (!mgr.incoming_packet(ls, packet.from, buf))
				{
					// if it wasn't a uTP packet, try the other users of the UDP
					// socket
					bool handled = false;
#ifndef TORRENT_DISABLE_DHT
					if (m_dht && buf.size() > 20
						&& buf.front() == 'd'
						&& buf.back() == 'e'
						&& listen_socket)
					{
						handled = m_dht->incoming_packet(listen_socket, packet.from, buf);
					}
#endif

					if (!handled)
					{
						m_tracker_manager.incoming_packet(packet.from, buf);
					}
				}
			}

			if (err == error::would_block || err == error::try_again)
			{
				// there are no more packets on the socket
				break;
			}

			if (err)
			{
				udp::endpoint const ep = s->local_endpoint();

				if (err != boost::asio::error::operation_aborted
					&& m_alerts.should_post<udp_error_alert>())
					m_alerts.emplace_alert<udp_error_alert>(ep
						, operation_t::sock_read, err);

#ifndef TORRENT_DISABLE_LOGGING
				if (should_log())
				{
					session_log("UDP error: %s (%d) %s"
						, print_endpoint(ep).c_str(), ec.value(), ec.message().c_str());
				}
#endif

				// any error other than these ones are considered fatal errors, and
				// we won't read from the socket again
				if (err != boost::asio::error::host_unreachable
					&& err != boost::asio::error::fault
					&& err != boost::asio::error::connection_reset
					&& err != boost::asio::error::connection_refused
					&& err != boost::asio::error::connection_aborted
					&& err != boost::asio::error::operation_aborted
					&& err != boost::asio::error::network_reset
					&& err != boost::asio::error::network_unreachable
#ifdef _WIN32
					// ERROR_MORE_DATA means the same thing as EMSGSIZE
					&& err != error_code(ERROR_MORE_DATA, system_category())
					&& err != error_code(ERROR_HOST_UNREACHABLE, system_category())
					&& err != error_code(ERROR_PORT_UNREACHABLE, system_category())
					&& err != error_code(ERROR_RETRY, system_category())
					&& err != error_code(ERROR_NETWORK_UNREACHABLE, system_category())
					&& err != error_code(ERROR_CONNECTION_REFUSED, system_category())
					&& err != error_code(ERROR_CONNECTION_ABORTED, system_category())
#endif
					&& err != boost::asio::error::message_size)
				{
					// fatal errors. Don't try to read from this socket again
					mgr.socket_drained();
					return;
				}
				// non-fatal UDP errors get here, we should re-issue the read.
				continue;
			}
		}

		mgr.socket_drained();

		ADD_OUTSTANDING_ASYNC("session_impl::on_udp_packet");
		s->sock.async_read(make_handler([this, socket, ls, ssl](error_code const& e)
			{ this->on_udp_packet(std::move(socket), std::move(ls), ssl, e); }
			, s->udp_handler_storage, *this));
	}

	void session_impl::async_accept(std::shared_ptr<tcp::acceptor> const& listener
		, transport const ssl)
#ifndef BOOST_NO_EXCEPTIONS
	try
#endif
	{
		TORRENT_ASSERT(!m_abort);

		std::weak_ptr<tcp::acceptor> ls(listener);
		m_stats_counters.inc_stats_counter(counters::num_outstanding_accept);
		ADD_OUTSTANDING_ASYNC("session_impl::on_accept_connection");
		listener->async_accept([this, ls, ssl] (error_code const& ec, true_tcp_socket s)
			{ return wrap(&session_impl::on_accept_connection, std::move(s), ec, ls, ssl); });
	}
#ifndef BOOST_NO_EXCEPTIONS
	catch (system_error const& e) {
		alerts().emplace_alert<session_error_alert>(e.code(), e.what());
		pause();
	} catch (std::exception const& e) {
		alerts().emplace_alert<session_error_alert>(error_code(), e.what());
		pause();
	} catch (...) {
		alerts().emplace_alert<session_error_alert>(error_code(), "unknown error");
		pause();
	}
#endif

	void session_impl::on_accept_connection(true_tcp_socket s, error_code const& e
		, std::weak_ptr<tcp::acceptor> listen_socket, transport const ssl)
	{
		COMPLETE_ASYNC("session_impl::on_accept_connection");
		m_stats_counters.inc_stats_counter(counters::on_accept_counter);
		m_stats_counters.inc_stats_counter(counters::num_outstanding_accept, -1);

		TORRENT_ASSERT(is_single_thread());
		std::shared_ptr<tcp::acceptor> listener = listen_socket.lock();
		if (!listener) return;

		if (e == boost::asio::error::operation_aborted) return;

		if (m_abort) return;

		error_code ec;
		if (e)
		{
			tcp::endpoint const ep = listener->local_endpoint(ec);
#ifndef TORRENT_DISABLE_LOGGING
			if (should_log())
			{
				session_log("error accepting connection on '%s': %s"
					, print_endpoint(ep).c_str(), e.message().c_str());
			}
#endif
#ifdef TORRENT_WINDOWS
			// Windows sometimes generates this error. It seems to be
			// non-fatal and we have to do another async_accept.
			if (e.value() == ERROR_SEM_TIMEOUT)
			{
				async_accept(listener, ssl);
				return;
			}
#endif
#ifdef TORRENT_BSD
			// Leopard sometimes generates an "invalid argument" error. It seems to be
			// non-fatal and we have to do another async_accept.
			if (e.value() == EINVAL)
			{
				async_accept(listener, ssl);
				return;
			}
#endif
			if (e == boost::system::errc::too_many_files_open)
			{
				// if we failed to accept an incoming connection
				// because we have too many files open, try again
				// and lower the number of file descriptors used
				// elsewhere.
				if (m_settings.get_int(settings_pack::connections_limit) > 10)
				{
					// now, disconnect a random peer
					auto const i = std::max_element(m_torrents.begin(), m_torrents.end()
						, [](std::shared_ptr<torrent> const& lhs, std::shared_ptr<torrent> const& rhs)
						{ return lhs->num_peers() < rhs->num_peers(); });

					if (m_alerts.should_post<performance_alert>())
						m_alerts.emplace_alert<performance_alert>(
							torrent_handle(), performance_alert::too_few_file_descriptors);

					if (i != m_torrents.end())
					{
						(*i)->disconnect_peers(1, e);
					}

					m_settings.set_int(settings_pack::connections_limit
						, std::max(10, int(m_connections.size())));
				}
				// try again, but still alert the user of the problem
				async_accept(listener, ssl);
			}
			if (m_alerts.should_post<listen_failed_alert>())
			{
				m_alerts.emplace_alert<listen_failed_alert>(ep.address().to_string()
					, ep, operation_t::sock_accept, e
					, ssl == transport::ssl ? socket_type_t::tcp_ssl : socket_type_t::tcp);
			}
			return;
		}
		async_accept(listener, ssl);

		// don't accept any connections from our local sockets if we're using a
		// proxy
		if (m_settings.get_int(settings_pack::proxy_type) != settings_pack::none)
			return;

		auto listen = std::find_if(m_listen_sockets.begin(), m_listen_sockets.end()
			, [&listener](std::shared_ptr<listen_socket_t> const& l)
		{ return l->sock == listener; });
		if (listen != m_listen_sockets.end())
			(*listen)->incoming_connection = true;

		socket_type c = [&]{
#ifdef TORRENT_SSL_PEERS
			if (ssl == transport::ssl)
			{
				// accept connections initializing the SSL connection to use the peer
				// ssl context. Since it has the servername callback set on it, we will
				// switch away from this context into a specific torrent once we start
				// handshaking
				return socket_type(ssl_stream<tcp::socket>(tcp::socket(std::move(s)), m_peer_ssl_ctx));
			}
			else
#endif
			{
				return socket_type(tcp::socket(std::move(s)));
			}
		}();

#ifdef TORRENT_SSL_PEERS
		TORRENT_ASSERT((ssl == transport::ssl) == is_ssl(c));
#endif

#ifdef TORRENT_SSL_PEERS
		if (ssl == transport::ssl)
		{
			TORRENT_ASSERT(is_ssl(c));

			// save the socket so we can cancel the handshake
			// TODO: this size need to be capped
			auto iter = m_incoming_sockets.emplace(std::make_unique<socket_type>(std::move(c))).first;

			auto sock = iter->get();
			// for SSL connections, incoming_connection() is called
			// after the handshake is done
			ADD_OUTSTANDING_ASYNC("session_impl::ssl_handshake");
			std::get<ssl_stream<tcp::socket>>(**iter).async_accept_handshake(
				[this, sock] (error_code const& err) { ssl_handshake(err, sock); });
		}
		else
#endif
		{
			incoming_connection(std::move(c));
		}
	}

#ifdef TORRENT_SSL_PEERS

	void session_impl::on_incoming_utp_ssl(socket_type s)
	{
		TORRENT_ASSERT(is_ssl(s));

		// save the socket so we can cancel the handshake

		// TODO: this size need to be capped
		auto iter = m_incoming_sockets.emplace(std::make_unique<socket_type>(std::move(s))).first;
		auto sock = iter->get();

		// for SSL connections, incoming_connection() is called
		// after the handshake is done
		ADD_OUTSTANDING_ASYNC("session_impl::ssl_handshake");
		std::get<ssl_stream<utp_stream>>(**iter).async_accept_handshake(
			[this, sock] (error_code const& err) { ssl_handshake(err, sock); });
	}

	// to test SSL connections, one can use this openssl command template:
	//
	// openssl s_client -cert <client-cert>.pem -key <client-private-key>.pem
	//   -CAfile <torrent-cert>.pem  -debug -connect 127.0.0.1:4433 -tls1
	//   -servername <hex-encoded-info-hash>

	void session_impl::ssl_handshake(error_code const& ec, socket_type* sock)
	{
		COMPLETE_ASYNC("session_impl::ssl_handshake");

		auto iter = m_incoming_sockets.find(sock);

		// this happens if the SSL connection is aborted because we're shutting
		// down
		if (iter == m_incoming_sockets.end()) return;

		socket_type s(std::move(**iter));
		TORRENT_ASSERT(is_ssl(s));
		m_incoming_sockets.erase(iter);

		error_code e;
		tcp::endpoint endp = s.remote_endpoint(e);
		if (e) return;

#ifndef TORRENT_DISABLE_LOGGING
		if (should_log())
		{
			session_log(" *** peer SSL handshake done [ ip: %s ec: %s socket: %s ]"
				, print_endpoint(endp).c_str(), ec.message().c_str(), socket_type_name(s));
		}
#endif

		if (ec)
		{
			if (m_alerts.should_post<peer_error_alert>())
			{
				m_alerts.emplace_alert<peer_error_alert>(torrent_handle(), endp
					, peer_id(), operation_t::ssl_handshake, ec);
			}
			return;
		}

		incoming_connection(std::move(s));
	}

#endif // TORRENT_SSL_PEERS

	void session_impl::incoming_connection(socket_type s)
	{
		TORRENT_ASSERT(is_single_thread());

		if (m_abort)
		{
#ifndef TORRENT_DISABLE_LOGGING
			session_log(" <== INCOMING CONNECTION [ ignored, aborting ]");
#endif
			return;
		}

		if (m_paused)
		{
#ifndef TORRENT_DISABLE_LOGGING
			session_log(" <== INCOMING CONNECTION [ ignored, paused ]");
#endif
			return;
		}

		error_code ec;
		// we got a connection request!
		tcp::endpoint endp = s.remote_endpoint(ec);

		if (ec)
		{
#ifndef TORRENT_DISABLE_LOGGING
			if (should_log())
			{
				session_log(" <== INCOMING CONNECTION [ rejected, could "
					"not retrieve remote endpoint: %s ]"
					, print_error(ec).c_str());
			}
#endif
			return;
		}

		if (!m_settings.get_bool(settings_pack::enable_incoming_utp)
			&& is_utp(s))
		{
#ifndef TORRENT_DISABLE_LOGGING
			session_log("<== INCOMING CONNECTION [ rejected uTP connection ]");
#endif
			if (m_alerts.should_post<peer_blocked_alert>())
				m_alerts.emplace_alert<peer_blocked_alert>(torrent_handle()
					, endp, peer_blocked_alert::utp_disabled);
			return;
		}

		if (!m_settings.get_bool(settings_pack::enable_incoming_tcp)
			&& std::get_if<tcp::socket>(&s))
		{
#ifndef TORRENT_DISABLE_LOGGING
			session_log("<== INCOMING CONNECTION [ rejected TCP connection ]");
#endif
			if (m_alerts.should_post<peer_blocked_alert>())
				m_alerts.emplace_alert<peer_blocked_alert>(torrent_handle()
					, endp, peer_blocked_alert::tcp_disabled);
			return;
		}

		// if there are outgoing interfaces specified, verify this
		// peer is correctly bound to one of them
		if (!m_settings.get_str(settings_pack::outgoing_interfaces).empty())
		{
			tcp::endpoint local = s.local_endpoint(ec);
			if (ec)
			{
#ifndef TORRENT_DISABLE_LOGGING
				if (should_log())
				{
					session_log("<== INCOMING CONNECTION [ rejected connection: %s ]"
						, print_error(ec).c_str());
				}
#endif
				return;
			}

			if (!verify_incoming_interface(local.address()))
			{
#ifndef TORRENT_DISABLE_LOGGING
				if (should_log())
				{
					session_log("<== INCOMING CONNECTION [ rejected, local interface has incoming connections disabled: %s ]"
						, local.address().to_string().c_str());
				}
#endif
				if (m_alerts.should_post<peer_blocked_alert>())
					m_alerts.emplace_alert<peer_blocked_alert>(torrent_handle()
						, endp, peer_blocked_alert::invalid_local_interface);
				return;
			}
			if (!verify_bound_address(local.address(), is_utp(s), ec))
			{
				if (ec)
				{
#ifndef TORRENT_DISABLE_LOGGING
					if (should_log())
					{
						session_log("<== INCOMING CONNECTION [ rejected, not allowed local interface: %s ]"
							, print_error(ec).c_str());
					}
#endif
					return;
				}

#ifndef TORRENT_DISABLE_LOGGING
				if (should_log())
				{
					session_log("<== INCOMING CONNECTION [ rejected, not allowed local interface: %s ]"
						, local.address().to_string().c_str());
				}
#endif
				if (m_alerts.should_post<peer_blocked_alert>())
					m_alerts.emplace_alert<peer_blocked_alert>(torrent_handle()
						, endp, peer_blocked_alert::invalid_local_interface);
				return;
			}
		}

		// local addresses do not count, since it's likely
		// coming from our own client through local service discovery
		// and it does not reflect whether or not a router is open
		// for incoming connections or not.
		if (!is_local(endp.address()))
			m_stats_counters.set_value(counters::has_incoming_connections, 1);

		// this filter is ignored if a single torrent
		// is set to ignore the filter, since this peer might be
		// for that torrent
		if (m_stats_counters[counters::non_filter_torrents] == 0
			&& m_ip_filter
			&& (m_ip_filter->access(endp.address()) & ip_filter::blocked))
		{
#ifndef TORRENT_DISABLE_LOGGING
			session_log("<== INCOMING CONNECTION [ filtered blocked ip ]");
#endif
			if (m_alerts.should_post<peer_blocked_alert>())
				m_alerts.emplace_alert<peer_blocked_alert>(torrent_handle()
					, endp, peer_blocked_alert::ip_filter);
			return;
		}

		// check if we have any active torrents
		// if we don't reject the connection
		if (m_torrents.empty())
		{
#ifndef TORRENT_DISABLE_LOGGING
			session_log("<== INCOMING CONNECTION [ rejected, there are no torrents ]");
#endif
			return;
		}

		// figure out which peer classes this is connections has,
		// to get connection_limit_factor
		peer_class_set pcs;
		set_peer_classes(&pcs, endp.address(), socket_type_idx(s));
		int connection_limit_factor = 0;
		for (int i = 0; i < pcs.num_classes(); ++i)
		{
			peer_class_t pc = pcs.class_at(i);
			if (m_classes.at(pc) == nullptr) continue;
			int f = m_classes.at(pc)->connection_limit_factor;
			if (connection_limit_factor < f) connection_limit_factor = f;
		}
		if (connection_limit_factor == 0) connection_limit_factor = 100;

		std::int64_t limit = m_settings.get_int(settings_pack::connections_limit);
		limit = limit * 100 / connection_limit_factor;

		// don't allow more connections than the max setting
		// weighed by the peer class' setting
		bool reject = num_connections() >= limit + m_settings.get_int(settings_pack::connections_slack);

		if (reject)
		{
			if (m_alerts.should_post<peer_disconnected_alert>())
			{
				m_alerts.emplace_alert<peer_disconnected_alert>(torrent_handle(), endp, peer_id()
						, operation_t::bittorrent, socket_type_idx(s)
						, error_code(errors::too_many_connections)
						, close_reason_t::none);
			}
#ifndef TORRENT_DISABLE_LOGGING
			if (should_log())
			{
				session_log("<== INCOMING CONNECTION [ connections limit exceeded, conns: %d, limit: %d, slack: %d ]"
					, num_connections(), m_settings.get_int(settings_pack::connections_limit)
					, m_settings.get_int(settings_pack::connections_slack));
			}
#endif
			return;
		}

		// if we don't have any active torrents, there's no
		// point in accepting this connection. If, however,
		// the setting to start up queued torrents when they
		// get an incoming connection is enabled, we cannot
		// perform this check.
		if (!m_settings.get_bool(settings_pack::incoming_starts_queued_torrents))
		{
			bool has_active_torrent = std::any_of(m_torrents.begin(), m_torrents.end()
				, [](std::shared_ptr<torrent> const& i)
				{ return !i->is_torrent_paused(); });
			if (!has_active_torrent)
			{
#ifndef TORRENT_DISABLE_LOGGING
				session_log("<== INCOMING CONNECTION [ rejected, no active torrents ]");
#endif
				return;
			}
		}

		m_stats_counters.inc_stats_counter(counters::incoming_connections);

		if (m_alerts.should_post<incoming_connection_alert>())
			m_alerts.emplace_alert<incoming_connection_alert>(socket_type_idx(s), endp);

		peer_connection_args pack{
			this
			, &m_settings
			, &m_stats_counters
			, m_disk_thread.get()
			, &m_io_context
			, std::weak_ptr<torrent>()
			, std::move(s)
			, endp
			, nullptr
			, aux::generate_peer_id(m_settings)
		};

		std::shared_ptr<peer_connection> c
			= std::make_shared<bt_peer_connection>(pack);

		if (!c->is_disconnecting())
		{
			// in case we've exceeded the limit, let this peer know that
			// as soon as it's received the handshake, it needs to either
			// disconnect or pick another peer to disconnect
			if (num_connections() >= limit)
				c->peer_exceeds_limit();

			TORRENT_ASSERT(!c->m_in_constructor);
			// removing a peer may not throw an exception, so prepare for this
			// connection to be added to the undead peers now.
			m_undead_peers.reserve(m_undead_peers.size() + m_connections.size() + 1);
			m_connections.insert(c);
			c->start();
		}
	}

	void session_impl::close_connection(peer_connection* p) noexcept
	{
		TORRENT_ASSERT(is_single_thread());
		std::shared_ptr<peer_connection> sp(p->self());

		TORRENT_ASSERT(p->is_disconnecting());

		auto const i = m_connections.find(sp);
		// make sure the next disk peer round-robin cursor stays valid
		if (i != m_connections.end())
		{
			m_connections.erase(i);

			TORRENT_ASSERT(std::find(m_undead_peers.begin()
				, m_undead_peers.end(), sp) == m_undead_peers.end());

			// someone else is holding a reference, it's important that
			// it's destructed from the network thread. Make sure the
			// last reference is held by the network thread.
			TORRENT_ASSERT_VAL(m_undead_peers.capacity() > m_undead_peers.size()
				, m_undead_peers.capacity());
			if (sp.use_count() > 2)
				m_undead_peers.push_back(sp);
		}
	}

#if TORRENT_ABI_VERSION == 1
	peer_id session_impl::deprecated_get_peer_id() const
	{
		return aux::generate_peer_id(m_settings);
	}
#endif

	int session_impl::next_port() const
	{
		int start = m_settings.get_int(settings_pack::outgoing_port);
		int num = m_settings.get_int(settings_pack::num_outgoing_ports);
		std::pair<int, int> out_ports(start, start + num);
		if (m_next_port < out_ports.first || m_next_port > out_ports.second)
			m_next_port = out_ports.first;

		int port = m_next_port;
		++m_next_port;
		if (m_next_port > out_ports.second) m_next_port = out_ports.first;
#ifndef TORRENT_DISABLE_LOGGING
		session_log(" *** BINDING OUTGOING CONNECTION [ port: %d ]", port);
#endif
		return port;
	}

	int session_impl::rate_limit(peer_class_t c, int channel) const
	{
		TORRENT_ASSERT(channel >= 0 && channel <= 1);
		if (channel < 0 || channel > 1) return 0;

		peer_class const* pc = m_classes.at(c);
		if (pc == nullptr) return 0;
		return pc->channel[channel].throttle();
	}

	int session_impl::upload_rate_limit(peer_class_t c) const
	{
		return rate_limit(c, peer_connection::upload_channel);
	}

	int session_impl::download_rate_limit(peer_class_t c) const
	{
		return rate_limit(c, peer_connection::download_channel);
	}

	void session_impl::set_rate_limit(peer_class_t c, int channel, int limit)
	{
		TORRENT_ASSERT(is_single_thread());
		TORRENT_ASSERT(limit >= -1);
		TORRENT_ASSERT(channel >= 0 && channel <= 1);

		if (channel < 0 || channel > 1) return;

		peer_class* pc = m_classes.at(c);
		if (pc == nullptr) return;
		if (limit <= 0) limit = 0;
		else limit = std::min(limit, std::numeric_limits<int>::max() - 1);
		pc->channel[channel].throttle(limit);
	}

	void session_impl::set_upload_rate_limit(peer_class_t c, int limit)
	{
		set_rate_limit(c, peer_connection::upload_channel, limit);
	}

	void session_impl::set_download_rate_limit(peer_class_t c, int limit)
	{
		set_rate_limit(c, peer_connection::download_channel, limit);
	}

#if TORRENT_USE_ASSERTS
	bool session_impl::has_peer(peer_connection const* p) const
	{
		TORRENT_ASSERT(is_single_thread());
		return std::any_of(m_connections.begin(), m_connections.end()
			, [p] (std::shared_ptr<peer_connection> const& pr)
			{ return pr.get() == p; });
	}

	bool session_impl::any_torrent_has_peer(peer_connection const* p) const
	{
		for (auto& pe : m_torrents)
			if (pe->has_peer(p)) return true;
		return false;
	}

	bool session_impl::verify_queue_position(torrent const* t, queue_position_t const pos)
	{
		return m_download_queue.end_index() > pos && m_download_queue[pos] == t;
	}
#endif

	void session_impl::sent_bytes(int bytes_payload, int bytes_protocol)
	{
		TORRENT_ASSERT(bytes_payload >= 0);
		TORRENT_ASSERT(bytes_protocol >= 0);
		m_stats_counters.inc_stats_counter(counters::sent_bytes
			, bytes_payload + bytes_protocol);
		m_stats_counters.inc_stats_counter(counters::sent_payload_bytes
			, bytes_payload);

		m_stat.sent_bytes(bytes_payload, bytes_protocol);
	}

	void session_impl::received_bytes(int bytes_payload, int bytes_protocol)
	{
		TORRENT_ASSERT(bytes_payload >= 0);
		TORRENT_ASSERT(bytes_protocol >= 0);
		m_stats_counters.inc_stats_counter(counters::recv_bytes
			, bytes_payload + bytes_protocol);
		m_stats_counters.inc_stats_counter(counters::recv_payload_bytes
			, bytes_payload);

		m_stat.received_bytes(bytes_payload, bytes_protocol);
	}

	void session_impl::trancieve_ip_packet(int bytes, bool ipv6)
	{
		TORRENT_ASSERT(bytes >= 0);
		// one TCP/IP packet header for the packet
		// sent or received, and one for the ACK
		// The IPv4 header is 20 bytes
		// and IPv6 header is 40 bytes
		int const header = (ipv6 ? 40 : 20) + 20;
		int const mtu = 1500;
		int const packet_size = mtu - header;
		int const overhead = std::max(1, (bytes + packet_size - 1) / packet_size) * header;
		m_stats_counters.inc_stats_counter(counters::sent_ip_overhead_bytes
			, overhead);
		m_stats_counters.inc_stats_counter(counters::recv_ip_overhead_bytes
			, overhead);

		m_stat.trancieve_ip_packet(bytes, ipv6);
	}

	void session_impl::sent_syn(bool ipv6)
	{
		int const overhead = ipv6 ? 60 : 40;
		m_stats_counters.inc_stats_counter(counters::sent_ip_overhead_bytes
			, overhead);

		m_stat.sent_syn(ipv6);
	}

	void session_impl::received_synack(bool ipv6)
	{
		int const overhead = ipv6 ? 60 : 40;
		m_stats_counters.inc_stats_counter(counters::sent_ip_overhead_bytes
			, overhead);
		m_stats_counters.inc_stats_counter(counters::recv_ip_overhead_bytes
			, overhead);

		m_stat.received_synack(ipv6);
	}

	void session_impl::on_tick(error_code const& e)
	{
		COMPLETE_ASYNC("session_impl::on_tick");
		m_stats_counters.inc_stats_counter(counters::on_tick_counter);

		TORRENT_ASSERT(is_single_thread());

		time_point const now = aux::time_now();

		// remove undead peers that only have this list as their reference keeping them alive
		if (!m_undead_peers.empty())
		{
			auto const remove_it = std::remove_if(m_undead_peers.begin(), m_undead_peers.end()
				, [](std::shared_ptr<peer_connection>& ptr) { return ptr.use_count() == 1; });
			m_undead_peers.erase(remove_it, m_undead_peers.end());
			if (m_undead_peers.empty())
			{
				// we just removed our last "undead" peer (i.e. a peer connection
				// that had some external reference to it). It's now safe to
				// shut-down
				if (m_abort)
				{
					post(m_io_context, make_handler([this] { abort_stage2(); }
						, m_abort_handler_storage, *this));
				}
			}
		}

// too expensive
//		INVARIANT_CHECK;

		// we have to keep ticking the utp socket manager
		// until they're all closed
		// we also have to keep updating the aux time while
		// there are outstanding announces
		if (m_abort)
		{
			if (m_utp_socket_manager.num_sockets() == 0
#ifdef TORRENT_SSL_PEERS
				&& m_ssl_utp_socket_manager.num_sockets() == 0
#endif
				&& m_undead_peers.empty()
				&& m_tracker_manager.empty())
			{
				// this is where shutdown completes. We won't issue another
				// on_tick()
				return;
			}
#if defined TORRENT_ASIO_DEBUGGING
			std::fprintf(stderr, "uTP sockets: %d ssl-uTP sockets: %d undead-peers left: %d\n"
				, m_utp_socket_manager.num_sockets()
#ifdef TORRENT_SSL_PEERS
				, m_ssl_utp_socket_manager.num_sockets()
#else
				, 0
#endif
				, int(m_undead_peers.size()));
#endif
		}

		if (e && e != boost::asio::error::operation_aborted)
		{
#ifndef TORRENT_DISABLE_LOGGING
			if (should_log())
				session_log("*** TICK TIMER FAILED %s", e.message().c_str());
#endif
			std::abort();
		}

		ADD_OUTSTANDING_ASYNC("session_impl::on_tick");
		milliseconds const tick_interval(m_abort ? 100 : m_settings.get_int(settings_pack::tick_interval));
		m_timer.expires_at(now + tick_interval);
		m_timer.async_wait(aux::make_handler([this](error_code const& err)
		{ wrap(&session_impl::on_tick, err); }, m_tick_handler_storage, *this));

		m_download_rate.update_quotas(now - m_last_tick);
		m_upload_rate.update_quotas(now - m_last_tick);

		m_last_tick = now;

		m_utp_socket_manager.tick(now);
#ifdef TORRENT_SSL_PEERS
		m_ssl_utp_socket_manager.tick(now);
#endif

		// only tick the following once per second
		if (now - m_last_second_tick < seconds(1)) return;

#ifndef TORRENT_DISABLE_DHT
		if (m_dht
			&& m_dht_interval_update_torrents < 40
			&& m_dht_interval_update_torrents != int(m_torrents.size()))
			update_dht_announce_interval();
#endif

		m_utp_socket_manager.decay();
#ifdef TORRENT_SSL_PEERS
		m_ssl_utp_socket_manager.decay();
#endif

		int const tick_interval_ms = aux::numeric_cast<int>(total_milliseconds(now - m_last_second_tick));
		m_last_second_tick = now;

		std::int32_t const stime = session_time();
		if (stime > 65000)
		{
			// we're getting close to the point where our timestamps
			// in torrent_peer are wrapping. We need to step all counters back
			// four hours. This means that any timestamp that refers to a time
			// more than 18.2 - 4 = 14.2 hours ago, will be incremented to refer to
			// 14.2 hours ago.

			m_created += hours(4);

			constexpr int four_hours = 60 * 60 * 4;
			for (auto& i : m_torrents)
			{
				i->step_session_time(four_hours);
			}
		}

#ifndef TORRENT_DISABLE_EXTENSIONS
		for (auto& ext : m_ses_extensions[plugins_tick_idx])
		{
			ext->on_tick();
		}
#endif

		// don't do any of the following while we're shutting down
		if (m_abort) return;

		switch (m_settings.get_int(settings_pack::mixed_mode_algorithm))
		{
			case settings_pack::prefer_tcp:
				set_upload_rate_limit(m_tcp_peer_class, 0);
				set_download_rate_limit(m_tcp_peer_class, 0);
				break;
			case settings_pack::peer_proportional:
				{
					int num_peers[2][2] = {{0, 0}, {0, 0}};
					for (auto const& i : m_connections)
					{
						peer_connection& p = *i;
						if (p.in_handshake()) continue;
						int protocol = 0;
						if (is_utp(p.get_socket())) protocol = 1;

						if (p.download_queue().size() + p.request_queue().size() > 0)
							++num_peers[protocol][peer_connection::download_channel];
						if (!p.upload_queue().empty())
							++num_peers[protocol][peer_connection::upload_channel];
					}

					peer_class* pc = m_classes.at(m_tcp_peer_class);
					bandwidth_channel* tcp_channel = pc->channel;
					int stat_rate[] = {m_stat.upload_rate(), m_stat.download_rate() };
					// never throttle below this
					int lower_limit[] = {5000, 30000};

					for (int i = 0; i < 2; ++i)
					{
						// if there are no uploading uTP peers, don't throttle TCP up
						if (num_peers[1][i] == 0)
						{
							tcp_channel[i].throttle(0);
						}
						else
						{
							if (num_peers[0][i] == 0) num_peers[0][i] = 1;
							int total_peers = num_peers[0][i] + num_peers[1][i];
							// this are 64 bits since it's multiplied by the number
							// of peers, which otherwise might overflow an int
							std::int64_t rate = stat_rate[i];
							tcp_channel[i].throttle(std::max(int(rate * num_peers[0][i] / total_peers), lower_limit[i]));
						}
					}
				}
				break;
		}

		// --------------------------------------------------------------
		// auto managed torrent
		// --------------------------------------------------------------
		if (!m_paused) m_auto_manage_time_scaler--;
		if (m_auto_manage_time_scaler < 0)
		{
			m_auto_manage_time_scaler = settings().get_int(settings_pack::auto_manage_interval);
			recalculate_auto_managed_torrents();
		}

		// --------------------------------------------------------------
		// check for incoming connections that might have timed out
		// --------------------------------------------------------------

		for (auto i = m_connections.begin(); i != m_connections.end();)
		{
			peer_connection* p = (*i).get();
			++i;
			// ignore connections that already have a torrent, since they
			// are ticked through the torrents' second_tick
			if (!p->associated_torrent().expired()) continue;

			// TODO: have a separate list for these connections, instead of having to loop through all of them
			int timeout = m_settings.get_int(settings_pack::handshake_timeout);
#if TORRENT_USE_I2P
			timeout *= is_i2p(p->get_socket()) ? 4 : 1;
#endif
			if (m_last_tick - p->connected_time () > seconds(timeout))
				p->disconnect(errors::timed_out, operation_t::bittorrent);
		}

		// --------------------------------------------------------------
		// second_tick every torrent (that wants it)
		// --------------------------------------------------------------

#if TORRENT_DEBUG_STREAMING > 0
		std::printf("\033[2J\033[0;0H");
#endif

		aux::vector<torrent*>& want_tick = m_torrent_lists[torrent_want_tick];
		for (int i = 0; i < int(want_tick.size()); ++i)
		{
			torrent& t = *want_tick[i];
			TORRENT_ASSERT(t.want_tick());
			TORRENT_ASSERT(!t.is_aborted());

			t.second_tick(tick_interval_ms);

			// if the call to second_tick caused the torrent
			// to no longer want to be ticked (i.e. it was
			// removed from the list) we need to back up the counter
			// to not miss the torrent after it
			if (!t.want_tick()) --i;
		}

		// TODO: this should apply to all bandwidth channels
		if (m_settings.get_bool(settings_pack::rate_limit_ip_overhead))
		{
			int const up_limit = upload_rate_limit(m_global_class);
			int const down_limit = download_rate_limit(m_global_class);

			if (down_limit > 0
				&& m_stat.download_ip_overhead() >= down_limit
				&& m_alerts.should_post<performance_alert>())
			{
				m_alerts.emplace_alert<performance_alert>(torrent_handle()
					, performance_alert::download_limit_too_low);
			}

			if (up_limit > 0
				&& m_stat.upload_ip_overhead() >= up_limit
				&& m_alerts.should_post<performance_alert>())
			{
				m_alerts.emplace_alert<performance_alert>(torrent_handle()
					, performance_alert::upload_limit_too_low);
			}
		}

#if TORRENT_ABI_VERSION == 1
		m_peak_up_rate = std::max(m_stat.upload_rate(), m_peak_up_rate);
#endif

		m_stat.second_tick(tick_interval_ms);

		// --------------------------------------------------------------
		// scrape paused torrents that are auto managed
		// (unless the session is paused)
		// --------------------------------------------------------------
		if (!m_paused)
		{
			INVARIANT_CHECK;
			--m_auto_scrape_time_scaler;
			if (m_auto_scrape_time_scaler <= 0)
			{
				aux::vector<torrent*>& want_scrape = m_torrent_lists[torrent_want_scrape];
				m_auto_scrape_time_scaler = m_settings.get_int(settings_pack::auto_scrape_interval)
					/ std::max(1, int(want_scrape.size()));
				if (m_auto_scrape_time_scaler < m_settings.get_int(settings_pack::auto_scrape_min_interval))
					m_auto_scrape_time_scaler = m_settings.get_int(settings_pack::auto_scrape_min_interval);

				if (!want_scrape.empty() && !m_abort)
				{
					if (m_next_scrape_torrent >= int(want_scrape.size()))
						m_next_scrape_torrent = 0;

					torrent& t = *want_scrape[m_next_scrape_torrent];
					TORRENT_ASSERT(t.is_paused() && t.is_auto_managed());

					// false means it's not triggered by the user, but automatically
					// by libtorrent
					t.scrape_tracker(-1, false);

					++m_next_scrape_torrent;
					if (m_next_scrape_torrent >= int(want_scrape.size()))
						m_next_scrape_torrent = 0;

				}
			}
		}

		// --------------------------------------------------------------
		// connect new peers
		// --------------------------------------------------------------

		try_connect_more_peers();

		// --------------------------------------------------------------
		// unchoke set calculations
		// --------------------------------------------------------------
		m_unchoke_time_scaler--;
		if (m_unchoke_time_scaler <= 0 && !m_connections.empty())
		{
			m_unchoke_time_scaler = settings().get_int(settings_pack::unchoke_interval);
			recalculate_unchoke_slots();
		}

		// --------------------------------------------------------------
		// optimistic unchoke calculation
		// --------------------------------------------------------------
		m_optimistic_unchoke_time_scaler--;
		if (m_optimistic_unchoke_time_scaler <= 0)
		{
			m_optimistic_unchoke_time_scaler
				= settings().get_int(settings_pack::optimistic_unchoke_interval);
			recalculate_optimistic_unchoke_slots();
		}

		// --------------------------------------------------------------
		// disconnect peers when we have too many
		// --------------------------------------------------------------
		--m_disconnect_time_scaler;
		if (m_disconnect_time_scaler <= 0)
		{
			m_disconnect_time_scaler = m_settings.get_int(settings_pack::peer_turnover_interval);

			// if the connections_limit is too low, the disconnect
			// logic is disabled, since it is too disruptive
			if (m_settings.get_int(settings_pack::connections_limit) > 5)
			{
				int const limit = std::min(m_settings.get_int(settings_pack::connections_limit)
					, std::numeric_limits<int>::max() / 100);
				int const cutoff = std::min(m_settings.get_int(settings_pack::peer_turnover_cutoff), 100);
				if (num_connections() >= limit * cutoff / 100 && !m_torrents.empty())
				{
					// every 90 seconds, disconnect the worst peers
					// if we have reached the connection limit
					auto const i = std::max_element(m_torrents.begin(), m_torrents.end()
						, [] (std::shared_ptr<torrent> const& lhs, std::shared_ptr<torrent> const& rhs)
						{ return lhs->num_peers() < rhs->num_peers(); });

					TORRENT_ASSERT(i != m_torrents.end());
					int const peers_to_disconnect = std::min(std::max(
						(*i)->num_peers() * m_settings.get_int(settings_pack::peer_turnover) / 100, 1)
						, (*i)->num_connect_candidates());
					(*i)->disconnect_peers(peers_to_disconnect
						, error_code(errors::optimistic_disconnect));
				}
				else
				{
					// if we haven't reached the global max. see if any torrent
					// has reached its local limit
					for (auto const& t : m_torrents)
					{
						// ths disconnect logic is disabled for torrents with
						// too low connection limit
						int const max = std::min(t->max_connections()
							, std::numeric_limits<int>::max() / 100);
						if (t->num_peers() < max * cutoff / 100 || max < 6)
							continue;

						int const peers_to_disconnect = std::min(std::max(t->num_peers()
							* m_settings.get_int(settings_pack::peer_turnover) / 100, 1)
							, t->num_connect_candidates());
						t->disconnect_peers(peers_to_disconnect, errors::optimistic_disconnect);
					}
				}
			}
		}
	}

	void session_impl::received_buffer(int s)
	{
		int index = std::min(aux::log2p1(std::uint32_t(s >> 3)), 17);
		m_stats_counters.inc_stats_counter(counters::socket_recv_size3 + index);
	}

	void session_impl::sent_buffer(int s)
	{
		int index = std::min(aux::log2p1(std::uint32_t(s >> 3)), 17);
		m_stats_counters.inc_stats_counter(counters::socket_send_size3 + index);
	}

	void session_impl::prioritize_connections(std::weak_ptr<torrent> t)
	{
		m_prio_torrents.emplace_back(t, 10);
	}

#ifndef TORRENT_DISABLE_DHT

	void session_impl::add_dht_node(udp::endpoint const& n)
	{
		TORRENT_ASSERT(is_single_thread());
		if (m_dht) m_dht->add_node(n);
		else m_dht_nodes.push_back(n);
	}

	bool session_impl::has_dht() const
	{
		return m_dht != nullptr;
	}

	void session_impl::prioritize_dht(std::weak_ptr<torrent> t)
	{
		TORRENT_ASSERT(!m_abort);
		if (m_abort) return;

		TORRENT_ASSERT(m_dht);
		m_dht_torrents.push_back(t);
#ifndef TORRENT_DISABLE_LOGGING
		std::shared_ptr<torrent> tor = t.lock();
		if (tor && should_log())
			session_log("prioritizing DHT announce: \"%s\"", tor->name().c_str());
#endif
		// trigger a DHT announce right away if we just added a new torrent and
		// there's no back-log. in the timer handler, as long as there are more
		// high priority torrents to be announced to the DHT, it will keep the
		// timer interval short until all torrents have been announced.
		if (m_dht_torrents.size() == 1)
		{
			ADD_OUTSTANDING_ASYNC("session_impl::on_dht_announce");
			m_dht_announce_timer.expires_after(seconds(0));
			m_dht_announce_timer.async_wait([this](error_code const& err) {
				wrap(&session_impl::on_dht_announce, err); });
		}
	}

	void session_impl::on_dht_announce(error_code const& e)
	{
		COMPLETE_ASYNC("session_impl::on_dht_announce");
		TORRENT_ASSERT(is_single_thread());
		if (e)
		{
#ifndef TORRENT_DISABLE_LOGGING
			if (should_log())
			{
				session_log("aborting DHT announce timer (%d): %s"
					, e.value(), e.message().c_str());
			}
#endif
			return;
		}

		if (m_abort)
		{
#ifndef TORRENT_DISABLE_LOGGING
			session_log("aborting DHT announce timer: m_abort set");
#endif
			return;
		}

		if (!m_dht)
		{
			m_dht_torrents.clear();
			return;
		}

		TORRENT_ASSERT(m_dht);

		// announce to DHT every 15 minutes
		int delay = std::max(m_settings.get_int(settings_pack::dht_announce_interval)
			/ std::max(int(m_torrents.size()), 1), 1);

		if (!m_dht_torrents.empty())
		{
			// we have prioritized torrents that need
			// an initial DHT announce. Don't wait too long
			// until we announce those.
			delay = std::min(4, delay);
		}

		ADD_OUTSTANDING_ASYNC("session_impl::on_dht_announce");
		m_dht_announce_timer.expires_after(seconds(delay));
		m_dht_announce_timer.async_wait([this](error_code const& err)
			{ wrap(&session_impl::on_dht_announce, err); });

		if (!m_dht_torrents.empty())
		{
			std::shared_ptr<torrent> t;
			do
			{
				t = m_dht_torrents.front().lock();
				m_dht_torrents.pop_front();
			} while (!t && !m_dht_torrents.empty());

			if (t)
			{
				t->dht_announce();
				return;
			}
		}
		if (m_torrents.empty()) return;

		if (m_next_dht_torrent >= m_torrents.size())
			m_next_dht_torrent = 0;
		m_torrents[m_next_dht_torrent]->dht_announce();
		// TODO: 2 make a list for torrents that want to be announced on the DHT so we
		// don't have to loop over all torrents, just to find the ones that want to announce
		++m_next_dht_torrent;
		if (m_next_dht_torrent >= m_torrents.size())
			m_next_dht_torrent = 0;
	}
#endif

	void session_impl::on_lsd_announce(error_code const& e)
	{
		COMPLETE_ASYNC("session_impl::on_lsd_announce");
		m_stats_counters.inc_stats_counter(counters::on_lsd_counter);
		TORRENT_ASSERT(is_single_thread());
		if (e) return;

		if (m_abort) return;

		ADD_OUTSTANDING_ASYNC("session_impl::on_lsd_announce");
		// announce on local network every 5 minutes
		int const delay = std::max(m_settings.get_int(settings_pack::local_service_announce_interval)
			/ std::max(int(m_torrents.size()), 1), 1);
		m_lsd_announce_timer.expires_after(seconds(delay));
		m_lsd_announce_timer.async_wait([this](error_code const& err) {
			wrap(&session_impl::on_lsd_announce, err); });

		if (m_torrents.empty()) return;

		if (m_next_lsd_torrent >= m_torrents.size())
			m_next_lsd_torrent = 0;
		m_torrents[m_next_lsd_torrent]->lsd_announce();
		++m_next_lsd_torrent;
		if (m_next_lsd_torrent >= m_torrents.size())
			m_next_lsd_torrent = 0;
	}

	void session_impl::auto_manage_checking_torrents(std::vector<torrent*>& list
		, int& limit)
	{
		for (auto& t : list)
		{
			TORRENT_ASSERT(t->state() == torrent_status::checking_files);
			TORRENT_ASSERT(t->is_auto_managed());
			if (limit <= 0)
			{
				t->pause();
			}
			else
			{
				t->resume();
				if (!t->should_check_files()) continue;
				t->start_checking();
				--limit;
			}
		}
	}

	void session_impl::auto_manage_torrents(std::vector<torrent*>& list
		, int& dht_limit, int& tracker_limit
		, int& lsd_limit, int& hard_limit, int type_limit)
	{
		for (auto& t : list)
		{
			TORRENT_ASSERT(t->state() != torrent_status::checking_files);

			// inactive torrents don't count (and if you configured them to do so,
			// the torrent won't say it's inactive)
			if (hard_limit > 0 && t->is_inactive())
			{
				t->set_announce_to_dht(--dht_limit >= 0);
				t->set_announce_to_trackers(--tracker_limit >= 0);
				t->set_announce_to_lsd(--lsd_limit >= 0);

				--hard_limit;
#ifndef TORRENT_DISABLE_LOGGING
				if (t->is_torrent_paused())
					t->log_to_all_peers("auto manager starting (inactive) torrent");
#endif
				t->set_paused(false);
				continue;
			}

			if (type_limit > 0 && hard_limit > 0)
			{
				t->set_announce_to_dht(--dht_limit >= 0);
				t->set_announce_to_trackers(--tracker_limit >= 0);
				t->set_announce_to_lsd(--lsd_limit >= 0);

				--hard_limit;
				--type_limit;
#ifndef TORRENT_DISABLE_LOGGING
				if (t->is_torrent_paused())
					t->log_to_all_peers("auto manager starting torrent");
#endif
				t->set_paused(false);
				continue;
			}

#ifndef TORRENT_DISABLE_LOGGING
			if (!t->is_torrent_paused())
				t->log_to_all_peers("auto manager pausing torrent");
#endif
			// use graceful pause for auto-managed torrents
			t->set_paused(true, torrent_handle::graceful_pause
				| torrent_handle::clear_disk_cache);
			t->set_announce_to_dht(false);
			t->set_announce_to_trackers(false);
			t->set_announce_to_lsd(false);
		}
	}

	int session_impl::get_int_setting(int n) const
	{
		int const v = settings().get_int(n);
		if (v < 0) return std::numeric_limits<int>::max();
		return v;
	}

	void session_impl::recalculate_auto_managed_torrents()
	{
		INVARIANT_CHECK;

		m_last_auto_manage = time_now();
		m_need_auto_manage = false;

		if (m_paused) return;

		// make copies of the lists of torrents that we want to consider for auto
		// management. We need copies because they will be sorted.
		std::vector<torrent*> checking
			= torrent_list(session_interface::torrent_checking_auto_managed);
		std::vector<torrent*> downloaders
			= torrent_list(session_interface::torrent_downloading_auto_managed);
		std::vector<torrent*> seeds
			= torrent_list(session_interface::torrent_seeding_auto_managed);

		// these counters are set to the number of torrents
		// of each kind we're allowed to have active
		int downloading_limit = get_int_setting(settings_pack::active_downloads);
		int seeding_limit = get_int_setting(settings_pack::active_seeds);
		int checking_limit = get_int_setting(settings_pack::active_checking);
		int dht_limit = get_int_setting(settings_pack::active_dht_limit);
		int tracker_limit = get_int_setting(settings_pack::active_tracker_limit);
		int lsd_limit = get_int_setting(settings_pack::active_lsd_limit);
		int hard_limit = get_int_setting(settings_pack::active_limit);

		// if hard_limit is <= 0, all torrents in these lists should be paused.
		// The order is not relevant
		if (hard_limit > 0)
		{
			// we only need to sort the first n torrents here, where n is the number
			// of checking torrents we allow. The rest of the list is still used to
			// make sure the remaining torrents are paused, but their order is not
			// relevant
			std::partial_sort(checking.begin(), checking.begin() +
				std::min(checking_limit, int(checking.size())), checking.end()
				, [](torrent const* lhs, torrent const* rhs)
				{ return lhs->sequence_number() < rhs->sequence_number(); });

			std::partial_sort(downloaders.begin(), downloaders.begin() +
				std::min(hard_limit, int(downloaders.size())), downloaders.end()
				, [](torrent const* lhs, torrent const* rhs)
				{ return lhs->sequence_number() < rhs->sequence_number(); });

			std::partial_sort(seeds.begin(), seeds.begin() +
				std::min(hard_limit, int(seeds.size())), seeds.end()
				, [this](torrent const* lhs, torrent const* rhs)
				{ return lhs->seed_rank(m_settings) > rhs->seed_rank(m_settings); });
		}

		auto_manage_checking_torrents(checking, checking_limit);

		if (settings().get_bool(settings_pack::auto_manage_prefer_seeds))
		{
			auto_manage_torrents(seeds, dht_limit, tracker_limit, lsd_limit
				, hard_limit, seeding_limit);
			auto_manage_torrents(downloaders, dht_limit, tracker_limit, lsd_limit
				, hard_limit, downloading_limit);
		}
		else
		{
			auto_manage_torrents(downloaders, dht_limit, tracker_limit, lsd_limit
				, hard_limit, downloading_limit);
			auto_manage_torrents(seeds, dht_limit, tracker_limit, lsd_limit
				, hard_limit, seeding_limit);
		}
	}

	namespace {
#ifndef TORRENT_DISABLE_EXTENSIONS
		uint64_t const priority_undetermined = std::numeric_limits<uint64_t>::max() - 1;
#endif

		struct opt_unchoke_candidate
		{
			explicit opt_unchoke_candidate(std::shared_ptr<peer_connection> const* tp)
				: peer(tp)
			{}

			std::shared_ptr<peer_connection> const* peer;
#ifndef TORRENT_DISABLE_EXTENSIONS
			// this is mutable because comparison functors passed to std::partial_sort
			// are not supposed to modify the elements they are sorting. Here the mutation
			// being applied is idempotent so it should not pose a problem.
			mutable uint64_t ext_priority = priority_undetermined;
#endif
		};

		struct last_optimistic_unchoke_cmp
		{
#ifndef TORRENT_DISABLE_EXTENSIONS
			explicit last_optimistic_unchoke_cmp(std::vector<std::shared_ptr<plugin>>& ps)
				: plugins(ps)
			{}

			std::vector<std::shared_ptr<plugin>>& plugins;
#endif

			uint64_t get_ext_priority(opt_unchoke_candidate const& peer) const
			{
#ifndef TORRENT_DISABLE_EXTENSIONS
				if (peer.ext_priority == priority_undetermined)
				{
					peer.ext_priority = std::numeric_limits<uint64_t>::max();
					for (auto& e : plugins)
					{
						uint64_t const priority = e->get_unchoke_priority(peer_connection_handle(*peer.peer));
						peer.ext_priority = std::min(priority, peer.ext_priority);
					}
				}
				return peer.ext_priority;
#else
				TORRENT_UNUSED(peer);
				return std::numeric_limits<uint64_t>::max();
#endif
			}

			bool operator()(opt_unchoke_candidate const& l
				, opt_unchoke_candidate const& r) const
			{
				torrent_peer const* pil = (*l.peer)->peer_info_struct();
				torrent_peer const* pir = (*r.peer)->peer_info_struct();
				if (pil->last_optimistically_unchoked
					!= pir->last_optimistically_unchoked)
				{
					return pil->last_optimistically_unchoked
						< pir->last_optimistically_unchoked;
				}
				else
				{
					return get_ext_priority(l) < get_ext_priority(r);
				}
			}
		};
	}

	void session_impl::recalculate_optimistic_unchoke_slots()
	{
		INVARIANT_CHECK;

		TORRENT_ASSERT(is_single_thread());
		if (m_stats_counters[counters::num_unchoke_slots] == 0) return;

		// if we unchoke everyone, skip this logic
		if (settings().get_int(settings_pack::choking_algorithm) == settings_pack::fixed_slots_choker
			&& settings().get_int(settings_pack::unchoke_slots_limit) < 0)
			return;

		std::vector<opt_unchoke_candidate> opt_unchoke;

		// collect the currently optimistically unchoked peers here, so we can
		// choke them when we've found new optimistic unchoke candidates.
		std::vector<torrent_peer*> prev_opt_unchoke;

		// TODO: 3 it would probably make sense to have a separate list of peers
		// that are eligible for optimistic unchoke, similar to the torrents
		// perhaps this could even iterate over the pool allocators of
		// torrent_peer objects. It could probably be done in a single pass and
		// collect the n best candidates. maybe just a queue of peers would make
		// even more sense, just pick the next peer in the queue for unchoking. It
		// would be O(1).
		for (auto const& i : m_connections)
		{
			peer_connection* const p = i.get();
			TORRENT_ASSERT(p);
			torrent_peer* pi = p->peer_info_struct();
			if (!pi) continue;
			if (pi->web_seed) continue;

			if (pi->optimistically_unchoked)
			{
				prev_opt_unchoke.push_back(pi);
			}

			torrent const* t = p->associated_torrent().lock().get();
			if (!t) continue;

			// TODO: 3 peers should know whether their torrent is paused or not,
			// instead of having to ask it over and over again
			if (t->is_paused()) continue;

			if (!p->is_connecting()
				&& !p->is_disconnecting()
				&& p->is_peer_interested()
				&& t->free_upload_slots()
				&& (p->is_choked() || pi->optimistically_unchoked)
				&& !p->ignore_unchoke_slots()
				&& t->valid_metadata())
			{
				opt_unchoke.emplace_back(&i);
			}
		}

		// find the peers that has been waiting the longest to be optimistically
		// unchoked

		int num_opt_unchoke = m_settings.get_int(settings_pack::num_optimistic_unchoke_slots);
		int const allowed_unchoke_slots = int(m_stats_counters[counters::num_unchoke_slots]);
		if (num_opt_unchoke == 0) num_opt_unchoke = std::max(1, allowed_unchoke_slots / 5);
		if (num_opt_unchoke > int(opt_unchoke.size())) num_opt_unchoke =
			int(opt_unchoke.size());

		// find the n best optimistic unchoke candidates
		std::partial_sort(opt_unchoke.begin()
			, opt_unchoke.begin() + num_opt_unchoke
			, opt_unchoke.end()
#ifndef TORRENT_DISABLE_EXTENSIONS
			, last_optimistic_unchoke_cmp(m_ses_extensions[plugins_optimistic_unchoke_idx])
#else
			, last_optimistic_unchoke_cmp()
#endif
			);

		// unchoke the first num_opt_unchoke peers in the candidate set
		// and make sure that the others are choked
		auto opt_unchoke_end = opt_unchoke.begin()
			+ num_opt_unchoke;

		for (auto i = opt_unchoke.begin(); i != opt_unchoke_end; ++i)
		{
			torrent_peer* pi = (*i->peer)->peer_info_struct();
			auto* const p = static_cast<peer_connection*>(pi->connection);
			if (pi->optimistically_unchoked)
			{
#ifndef TORRENT_DISABLE_LOGGING
					p->peer_log(peer_log_alert::info, "OPTIMISTIC UNCHOKE"
						, "already unchoked | session-time: %d"
						, pi->last_optimistically_unchoked);
#endif
				TORRENT_ASSERT(!pi->connection->is_choked());
				// remove this peer from prev_opt_unchoke, to prevent us from
				// choking it later. This peer gets another round of optimistic
				// unchoke
				auto const existing =
					std::find(prev_opt_unchoke.begin(), prev_opt_unchoke.end(), pi);
				TORRENT_ASSERT(existing != prev_opt_unchoke.end());
				prev_opt_unchoke.erase(existing);
			}
			else
			{
				TORRENT_ASSERT(p->is_choked());
				std::shared_ptr<torrent> t = p->associated_torrent().lock();
				bool ret = t->unchoke_peer(*p, true);
				TORRENT_ASSERT(ret);
				if (ret)
				{
					pi->optimistically_unchoked = true;
					m_stats_counters.inc_stats_counter(counters::num_peers_up_unchoked_optimistic);
					pi->last_optimistically_unchoked = std::uint16_t(session_time());
#ifndef TORRENT_DISABLE_LOGGING
					p->peer_log(peer_log_alert::info, "OPTIMISTIC UNCHOKE"
						, "session-time: %d", pi->last_optimistically_unchoked);
#endif
				}
			}
		}

		// now, choke all the previous optimistically unchoked peers
		for (torrent_peer* pi : prev_opt_unchoke)
		{
			TORRENT_ASSERT(pi->optimistically_unchoked);
			auto* const p = static_cast<peer_connection*>(pi->connection);
			std::shared_ptr<torrent> t = p->associated_torrent().lock();
			pi->optimistically_unchoked = false;
			m_stats_counters.inc_stats_counter(counters::num_peers_up_unchoked_optimistic, -1);
			t->choke_peer(*p);
		}

		// if we have too many unchoked peers now, we need to trigger the regular
		// choking logic to choke some
		if (m_stats_counters[counters::num_unchoke_slots]
			< m_stats_counters[counters::num_peers_up_unchoked_all])
		{
			m_unchoke_time_scaler = 0;
		}
	}

	void session_impl::try_connect_more_peers()
	{
		if (m_abort) return;

		if (num_connections() >= m_settings.get_int(settings_pack::connections_limit))
			return;

		// this is the maximum number of connections we will
		// attempt this tick
		int max_connections = m_settings.get_int(settings_pack::connection_speed);

		// this loop will "hand out" connection_speed to the torrents, in a round
		// robin fashion, so that every torrent is equally likely to connect to a
		// peer

		// boost connections are connections made by torrent connection
		// boost, which are done immediately on a tracker response. These
		// connections needs to be deducted from the regular connection attempt
		// quota for this tick
		if (m_boost_connections > 0)
		{
			if (m_boost_connections > max_connections)
			{
				m_boost_connections -= max_connections;
				max_connections = 0;
			}
			else
			{
				max_connections -= m_boost_connections;
				m_boost_connections = 0;
			}
		}

		// zero connections speeds are allowed, we just won't make any connections
		if (max_connections <= 0) return;

		// TODO: use a lower limit than m_settings.connections_limit
		// to allocate the to 10% or so of connection slots for incoming
		// connections
		// cap this at max - 1, since we may add one below
		int const limit = std::min(m_settings.get_int(settings_pack::connections_limit)
			- num_connections(), std::numeric_limits<int>::max() - 1);

		// this logic is here to smooth out the number of new connection
		// attempts over time, to prevent connecting a large number of
		// sockets, wait 10 seconds, and then try again
		if (m_settings.get_bool(settings_pack::smooth_connects) && max_connections > (limit+1) / 2)
			max_connections = (limit + 1) / 2;

		aux::vector<torrent*>& want_peers_download = m_torrent_lists[torrent_want_peers_download];
		aux::vector<torrent*>& want_peers_finished = m_torrent_lists[torrent_want_peers_finished];

		// if no torrent want any peers, just return
		if (want_peers_download.empty() && want_peers_finished.empty()) return;

		// if we don't have any connection attempt quota, return
		if (max_connections <= 0) return;

		int steps_since_last_connect = 0;
		int const num_torrents = int(want_peers_finished.size() + want_peers_download.size());
		for (;;)
		{
			if (m_next_downloading_connect_torrent >= int(want_peers_download.size()))
				m_next_downloading_connect_torrent = 0;

			if (m_next_finished_connect_torrent >= int(want_peers_finished.size()))
				m_next_finished_connect_torrent = 0;

			torrent* t = nullptr;
			// there are prioritized torrents. Pick one of those
			while (!m_prio_torrents.empty())
			{
				t = m_prio_torrents.front().first.lock().get();
				--m_prio_torrents.front().second;
				if (m_prio_torrents.front().second > 0
					&& t != nullptr
					&& t->want_peers()) break;
				m_prio_torrents.pop_front();
				t = nullptr;
			}

			if (t == nullptr)
			{
				if ((m_download_connect_attempts >= m_settings.get_int(
						settings_pack::connect_seed_every_n_download)
					&& !want_peers_finished.empty())
						|| want_peers_download.empty())
				{
					// pick a finished torrent to give a peer to
					t = want_peers_finished[m_next_finished_connect_torrent];
					TORRENT_ASSERT(t->want_peers_finished());
					m_download_connect_attempts = 0;
					++m_next_finished_connect_torrent;
				}
				else
				{
					// pick a downloading torrent to give a peer to
					t = want_peers_download[m_next_downloading_connect_torrent];
					TORRENT_ASSERT(t->want_peers_download());
					++m_download_connect_attempts;
					++m_next_downloading_connect_torrent;
				}
			}

			TORRENT_ASSERT(t->want_peers());
			TORRENT_ASSERT(!t->is_torrent_paused());

			if (t->try_connect_peer())
			{
				--max_connections;
				steps_since_last_connect = 0;
				m_stats_counters.inc_stats_counter(counters::connection_attempts);
			}

			++steps_since_last_connect;

			// if there are no more free connection slots, abort
			if (max_connections == 0) return;
			// there are no more torrents that want peers
			if (want_peers_download.empty() && want_peers_finished.empty()) break;
			// if we have gone a whole loop without
			// handing out a single connection, break
			if (steps_since_last_connect > num_torrents + 1) break;
			// maintain the global limit on number of connections
			if (num_connections() >= m_settings.get_int(settings_pack::connections_limit)) break;
		}
	}

	void session_impl::recalculate_unchoke_slots()
	{
		TORRENT_ASSERT(is_single_thread());

		time_point const now = aux::time_now();
		time_duration const unchoke_interval = now - m_last_choke;
		m_last_choke = now;

		// if we unchoke everyone, skip this logic
		if (settings().get_int(settings_pack::choking_algorithm) == settings_pack::fixed_slots_choker
			&& settings().get_int(settings_pack::unchoke_slots_limit) < 0)
		{
			m_stats_counters.set_value(counters::num_unchoke_slots, std::numeric_limits<int>::max());
			return;
		}

		// build list of all peers that are
		// unchokable.
		// TODO: 3 there should be a pre-calculated list of all peers eligible for
		// unchoking
		std::vector<peer_connection*> peers;
		for (auto i = m_connections.begin(); i != m_connections.end();)
		{
			std::shared_ptr<peer_connection> p = *i;
			TORRENT_ASSERT(p);
			++i;
			torrent* const t = p->associated_torrent().lock().get();
			torrent_peer* const pi = p->peer_info_struct();

			if (p->ignore_unchoke_slots() || t == nullptr || pi == nullptr
				|| pi->web_seed || t->is_paused())
			{
				p->reset_choke_counters();
				continue;
			}

			if (!p->is_peer_interested()
				|| p->is_disconnecting()
				|| p->is_connecting())
			{
				// this peer is not unchokable. So, if it's unchoked
				// already, make sure to choke it.
				if (p->is_choked())
				{
					p->reset_choke_counters();
					continue;
				}
				if (pi && pi->optimistically_unchoked)
				{
					m_stats_counters.inc_stats_counter(counters::num_peers_up_unchoked_optimistic, -1);
					pi->optimistically_unchoked = false;
					// force a new optimistic unchoke
					m_optimistic_unchoke_time_scaler = 0;
					// TODO: post a message to have this happen
					// immediately instead of waiting for the next tick
				}
				t->choke_peer(*p);
				p->reset_choke_counters();
				continue;
			}

			peers.push_back(p.get());
		}

		int const allowed_upload_slots = unchoke_sort(peers
			, unchoke_interval, m_settings);

		if (m_settings.get_int(settings_pack::choking_algorithm) == settings_pack::fixed_slots_choker)
		{
			int const upload_slots = get_int_setting(settings_pack::unchoke_slots_limit);
			m_stats_counters.set_value(counters::num_unchoke_slots, upload_slots);
		}
		else
		{
			m_stats_counters.set_value(counters::num_unchoke_slots
				, allowed_upload_slots);
		}

#ifndef TORRENT_DISABLE_LOGGING
		if (should_log())
		{
			session_log("RECALCULATE UNCHOKE SLOTS: [ peers: %d "
				"eligible-peers: %d"
				" allowed-slots: %d ]"
				, int(m_connections.size())
				, int(peers.size())
				, allowed_upload_slots);
		}
#endif

		int const unchoked_counter_optimistic
			= int(m_stats_counters[counters::num_peers_up_unchoked_optimistic]);
		int const num_opt_unchoke = (unchoked_counter_optimistic == 0)
			? std::max(1, allowed_upload_slots / 5) : unchoked_counter_optimistic;

		int unchoke_set_size = allowed_upload_slots - num_opt_unchoke;

		// go through all the peers and unchoke the first ones and choke
		// all the other ones.
		for (auto* const p : peers)
		{
			TORRENT_ASSERT(p != nullptr);
			TORRENT_ASSERT(!p->ignore_unchoke_slots());

			// this will update the m_uploaded_at_last_unchoke
			p->reset_choke_counters();

			torrent* t = p->associated_torrent().lock().get();
			TORRENT_ASSERT(t);

			if (unchoke_set_size > 0)
			{
				// yes, this peer should be unchoked
				if (p->is_choked())
				{
					if (!t->unchoke_peer(*p))
						continue;
				}

				--unchoke_set_size;

				TORRENT_ASSERT(p->peer_info_struct());
				if (p->peer_info_struct()->optimistically_unchoked)
				{
					// force a new optimistic unchoke
					// since this one just got promoted into the
					// proper unchoke set
					m_optimistic_unchoke_time_scaler = 0;
					p->peer_info_struct()->optimistically_unchoked = false;
					m_stats_counters.inc_stats_counter(counters::num_peers_up_unchoked_optimistic, -1);
				}
			}
			else
			{
				// no, this peer should be choked
				TORRENT_ASSERT(p->peer_info_struct());
				if (!p->is_choked() && !p->peer_info_struct()->optimistically_unchoked)
					t->choke_peer(*p);
			}
		}
	}

	std::shared_ptr<torrent> session_impl::delay_load_torrent(info_hash_t const& info_hash
		, peer_connection* pc)
	{
#ifndef TORRENT_DISABLE_EXTENSIONS
		for (auto& e : m_ses_extensions[plugins_all_idx])
		{
			add_torrent_params p;
			if (e->on_unknown_torrent(info_hash, peer_connection_handle(pc->self()), p))
			{
				error_code ec;
				torrent_handle handle = add_torrent(std::move(p), ec);

				return handle.native_handle();
			}
		}
#else
		TORRENT_UNUSED(pc);
		TORRENT_UNUSED(info_hash);
#endif
		return std::shared_ptr<torrent>();
	}

	// the return value from this function is valid only as long as the
	// session is locked!
	std::weak_ptr<torrent> session_impl::find_torrent(info_hash_t const& info_hash) const
	{
		TORRENT_ASSERT(is_single_thread());

		torrent* i = nullptr;
		info_hash.for_each([&](sha1_hash const& ih, protocol_version)
		{
			if (i == nullptr) i = m_torrents.find(ih);
		});
#if TORRENT_USE_INVARIANT_CHECKS
		for (auto const& te : m_torrents)
		{
			TORRENT_ASSERT(te);
		}
#endif
		if (i != nullptr) return i->shared_from_this();
		return std::weak_ptr<torrent>();
	}

	void session_impl::insert_torrent(info_hash_t const& ih, std::shared_ptr<torrent> const& t)
	{
		m_torrents.insert(ih, t);
		t->added();
	}

	void session_impl::update_torrent_info_hash(std::shared_ptr<torrent> const& t
		, info_hash_t const& old_ih)
	{
		m_torrents.erase(old_ih);
		m_torrents.insert(t->torrent_file().info_hashes(), t);
	}

	void session_impl::set_queue_position(torrent* me, queue_position_t p)
	{
		queue_position_t const current_pos = me->queue_position();
		if (current_pos == p) return;

		if (p >= queue_position_t{0} && current_pos == no_pos)
		{
			// we're inserting the torrent into the download queue
			queue_position_t const last = m_download_queue.end_index();
			if (p >= last)
			{
				m_download_queue.push_back(me);
				me->set_queue_position_impl(last);
			}
			else
			{
				m_download_queue.insert(m_download_queue.begin() + static_cast<int>(p), me);
				for (queue_position_t i = p; i < m_download_queue.end_index(); ++i)
				{
					m_download_queue[i]->set_queue_position_impl(i);
				}
			}
		}
		else if (p < queue_position_t{})
		{
			// we're removing the torrent from the download queue
			TORRENT_ASSERT(current_pos >= queue_position_t{0});
			TORRENT_ASSERT(p == no_pos);
			TORRENT_ASSERT(m_download_queue[current_pos] == me);
			m_download_queue.erase(m_download_queue.begin() + static_cast<int>(current_pos));
			me->set_queue_position_impl(no_pos);
			for (queue_position_t i = current_pos; i < m_download_queue.end_index(); ++i)
			{
				m_download_queue[i]->set_queue_position_impl(i);
			}
		}
		else if (p < current_pos)
		{
			// we're moving the torrent up the queue
			torrent* tmp = me;
			for (queue_position_t i = p; i <= current_pos; ++i)
			{
				std::swap(m_download_queue[i], tmp);
				m_download_queue[i]->set_queue_position_impl(i);
			}
			TORRENT_ASSERT(tmp == me);
		}
		else if (p > current_pos)
		{
			// we're moving the torrent down the queue
			p = std::min(p, prev(m_download_queue.end_index()));
			for (queue_position_t i = current_pos; i < p; ++i)
			{
				m_download_queue[i] = m_download_queue[next(i)];
				m_download_queue[i]->set_queue_position_impl(i);
			}
			m_download_queue[p] = me;
			me->set_queue_position_impl(p);
		}

		trigger_auto_manage();
	}

#if !defined TORRENT_DISABLE_ENCRYPTION
	torrent const* session_impl::find_encrypted_torrent(sha1_hash const& info_hash
		, sha1_hash const& xor_mask)
	{
		sha1_hash obfuscated = info_hash;
		obfuscated ^= xor_mask;

		return m_torrents.find_obfuscated(obfuscated);
	}
#endif

#ifndef TORRENT_DISABLE_MUTABLE_TORRENTS
	std::vector<std::shared_ptr<torrent>> session_impl::find_collection(
		std::string const& collection) const
	{
		std::vector<std::shared_ptr<torrent>> ret;
		for (auto const& t : m_torrents)
		{
			if (!t) continue;
			std::vector<std::string> const& c = t->torrent_file().collections();
			if (std::find(c.begin(), c.end(), collection) == c.end()) continue;
			ret.push_back(t);
		}
		return ret;
	}
#endif //TORRENT_DISABLE_MUTABLE_TORRENTS

	namespace {

	// returns true if lhs is a better disconnect candidate than rhs
	bool compare_disconnect_torrent(std::shared_ptr<torrent> const& lhs
		, std::shared_ptr<torrent> const& rhs)
	{
		// a torrent with 0 peers is never a good disconnect candidate
		// since there's nothing to disconnect
		if ((lhs->num_peers() == 0) != (rhs->num_peers() == 0))
			return lhs->num_peers() != 0;

		// other than that, always prefer to disconnect peers from seeding torrents
		// in order to not harm downloading ones
		if (lhs->is_seed() != rhs->is_seed())
			return lhs->is_seed();

		return lhs->num_peers() > rhs->num_peers();
	}

	} // anonymous namespace

	std::weak_ptr<torrent> session_impl::find_disconnect_candidate_torrent() const
	{
		auto const i = std::min_element(m_torrents.begin(), m_torrents.end()
			, &compare_disconnect_torrent);

		TORRENT_ASSERT(i != m_torrents.end());
		if (i == m_torrents.end()) return std::shared_ptr<torrent>();

		return *i;
	}

#ifndef TORRENT_DISABLE_LOGGING
	bool session_impl::should_log() const
	{
		return m_alerts.should_post<log_alert>();
	}

	TORRENT_FORMAT(2,3)
	void session_impl::session_log(char const* fmt, ...) const noexcept try
	{
		if (!m_alerts.should_post<log_alert>()) return;

		va_list v;
		va_start(v, fmt);
		m_alerts.emplace_alert<log_alert>(fmt, v);
		va_end(v);
	}
	catch (std::exception const&) {}
#endif

	void session_impl::get_torrent_status(std::vector<torrent_status>* ret
		, std::function<bool(torrent_status const&)> const& pred
		, status_flags_t const flags) const
	{
		for (auto const& t : m_torrents)
		{
			if (t->is_aborted()) continue;
			torrent_status st;
			t->status(&st, flags);
			if (!pred(st)) continue;
			ret->push_back(std::move(st));
		}
	}

	void session_impl::refresh_torrent_status(std::vector<torrent_status>* ret
		, status_flags_t const flags) const
	{
		for (auto& st : *ret)
		{
			auto t = st.handle.m_torrent.lock();
			if (!t) continue;
			t->status(&st, flags);
		}
	}

	void session_impl::post_torrent_updates(status_flags_t const flags)
	{
		INVARIANT_CHECK;

		TORRENT_ASSERT(is_single_thread());

		std::vector<torrent*>& state_updates
			= m_torrent_lists[aux::session_impl::torrent_state_updates];

#if TORRENT_USE_ASSERTS
		m_posting_torrent_updates = true;
#endif

		std::vector<torrent_status> status;
		status.reserve(state_updates.size());

		// TODO: it might be a nice feature here to limit the number of torrents
		// to send in a single update. By just posting the first n torrents, they
		// would nicely be round-robined because the torrent lists are always
		// pushed back. Perhaps the status_update_alert could even have a fixed
		// array of n entries rather than a vector, to further improve memory
		// locality.
		for (auto& t : state_updates)
		{
			TORRENT_ASSERT(t->m_links[aux::session_impl::torrent_state_updates].in_list());
			status.emplace_back();
			// querying accurate download counters may require
			// the torrent to be loaded. Loading a torrent, and evicting another
			// one will lead to calling state_updated(), which screws with
			// this list while we're working on it, and break things
			t->status(&status.back(), flags);
			t->clear_in_state_update();
		}
		state_updates.clear();

#if TORRENT_USE_ASSERTS
		m_posting_torrent_updates = false;
#endif

		m_alerts.emplace_alert<state_update_alert>(std::move(status));
	}

	void session_impl::post_session_stats()
	{
		if (!m_posted_stats_header)
		{
			m_posted_stats_header = true;
			m_alerts.emplace_alert<session_stats_header_alert>();
		}
		m_disk_thread->update_stats_counters(m_stats_counters);

#ifndef TORRENT_DISABLE_DHT
		if (m_dht)
			m_dht->update_stats_counters(m_stats_counters);
#endif

		m_stats_counters.set_value(counters::limiter_up_queue
			, m_upload_rate.queue_size());
		m_stats_counters.set_value(counters::limiter_down_queue
			, m_download_rate.queue_size());

		m_stats_counters.set_value(counters::limiter_up_bytes
			, m_upload_rate.queued_bytes());
		m_stats_counters.set_value(counters::limiter_down_bytes
			, m_download_rate.queued_bytes());

		m_alerts.emplace_alert<session_stats_alert>(m_stats_counters);
	}

	void session_impl::post_dht_stats()
	{
#ifndef TORRENT_DISABLE_DHT
		std::vector<dht::dht_status> dht_stats;
		if (m_dht)
			dht_stats = m_dht->dht_status();

		if (dht_stats.empty())
		{
			// for backwards compatibility, still post an empty alert if we don't
			// have any active DHT nodes
			m_alerts.emplace_alert<dht_stats_alert>(std::vector<dht_routing_bucket>{}
				, std::vector<dht_lookup>{}, dht::node_id{}, udp::endpoint{});
		}
		else
		{
			for (auto& s : dht_stats)
			{
				m_alerts.emplace_alert<dht_stats_alert>(
					std::move(s.table), std::move(s.requests)
					, s.our_id, s.local_endpoint);
			}
		}
#endif
	}

	std::vector<torrent_handle> session_impl::get_torrents() const
	{
		std::vector<torrent_handle> ret;

		for (auto const& i : m_torrents)
		{
			if (i->is_aborted()) continue;
			ret.push_back(torrent_handle(i));
		}
		return ret;
	}

	torrent_handle session_impl::find_torrent_handle(sha1_hash const& info_hash)
	{
		return torrent_handle(find_torrent(info_hash_t(info_hash)));
	}

	void session_impl::async_add_torrent(add_torrent_params* params)
	{
		std::unique_ptr<add_torrent_params> holder(params);
		error_code ec;
		add_torrent(std::move(*params), ec);
	}

#ifndef TORRENT_DISABLE_EXTENSIONS
	void session_impl::add_extensions_to_torrent(
		std::shared_ptr<torrent> const& torrent_ptr, client_data_t const userdata)
	{
		for (auto& e : m_ses_extensions[plugins_all_idx])
		{
			std::shared_ptr<torrent_plugin> tp(e->new_torrent(
				torrent_ptr->get_handle(), userdata));
			if (tp) torrent_ptr->add_extension(std::move(tp));
		}
	}
#endif

	torrent_handle session_impl::add_torrent(add_torrent_params&& params
		, error_code& ec)
	{
		std::shared_ptr<torrent> torrent_ptr;

		// in case there's an error, make sure to abort the torrent before leaving
		// the scope
		auto abort_torrent = aux::scope_end([&]{ if (torrent_ptr) torrent_ptr->abort(); });

#ifndef TORRENT_DISABLE_EXTENSIONS
		auto extensions = std::move(params.extensions);
		auto const userdata = std::move(params.userdata);
#endif

		// copy the most important fields from params to pass back in the
		// add_torrent_alert
		add_torrent_params alert_params;
		alert_params.flags = params.flags;
		alert_params.ti = params.ti;
		alert_params.name = params.name;
		alert_params.save_path = params.save_path;
		alert_params.userdata = params.userdata;
		alert_params.trackerid = params.trackerid;

		auto const flags = params.flags;

		info_hash_t info_hash;
		bool added;
		std::tie(torrent_ptr, info_hash, added) = add_torrent_impl(std::move(params), ec);

		alert_params.info_hashes = info_hash;

		torrent_handle handle(torrent_ptr);
		m_alerts.emplace_alert<add_torrent_alert>(handle, std::move(alert_params), ec);

		if (!torrent_ptr) return handle;

		TORRENT_ASSERT(info_hash.has_v1() || info_hash.has_v2());

#if TORRENT_ABI_VERSION == 1
		if (m_alerts.should_post<torrent_added_alert>())
			m_alerts.emplace_alert<torrent_added_alert>(handle);
#endif

		// if this was an existing torrent, we can't start it again, or add
		// another set of plugins etc. we're done
		if (!added)
		{
			abort_torrent.disarm();
			return handle;
		}

		torrent_ptr->set_ip_filter(m_ip_filter);
		torrent_ptr->start();

#ifndef TORRENT_DISABLE_EXTENSIONS
		for (auto& ext : extensions)
		{
			std::shared_ptr<torrent_plugin> tp(ext(handle, userdata));
			if (tp) torrent_ptr->add_extension(std::move(tp));
		}

		add_extensions_to_torrent(torrent_ptr, userdata);
#endif

		TORRENT_ASSERT(info_hash == torrent_ptr->torrent_file().info_hashes());
		insert_torrent(info_hash, torrent_ptr);

		// once we successfully add the torrent, we can disarm the abort action
		abort_torrent.disarm();

		// recalculate auto-managed torrents sooner (or put it off)
		// if another torrent will be added within one second from now
		// we want to put it off again anyway. So that while we're adding
		// a boat load of torrents, we postpone the recalculation until
		// we're done adding them all (since it's kind of an expensive operation)
		if (flags & torrent_flags::auto_managed)
		{
			const int max_downloading = settings().get_int(settings_pack::active_downloads);
			const int max_seeds = settings().get_int(settings_pack::active_seeds);
			const int max_active = settings().get_int(settings_pack::active_limit);

			const int num_downloading
			= int(torrent_list(session_interface::torrent_downloading_auto_managed).size());
			const int num_seeds
			= int(torrent_list(session_interface::torrent_seeding_auto_managed).size());
			const int num_active = num_downloading + num_seeds;

			// there's no point in triggering the auto manage logic early if we
			// don't have a reason to believe anything will change. It's kind of
			// expensive.
			if ((num_downloading < max_downloading
				|| num_seeds < max_seeds)
				&& num_active < max_active)
			{
				trigger_auto_manage();
			}
		}

		return handle;
	}

	std::tuple<std::shared_ptr<torrent>, info_hash_t, bool>
	session_impl::add_torrent_impl(add_torrent_params&& params, error_code& ec)
	{
		TORRENT_ASSERT(!params.save_path.empty());

		using ptr_t = std::shared_ptr<torrent>;
		using ret_t = std::tuple<std::shared_ptr<torrent>, info_hash_t, bool>;

#if TORRENT_ABI_VERSION == 1
		if (string_begins_no_case("magnet:", params.url.c_str()))
		{
			parse_magnet_uri(params.url, params, ec);
			if (ec) return ret_t{ptr_t(), params.info_hashes, false};
			params.url.clear();
		}
#endif

		if (params.ti && !params.ti->is_valid())
		{
			ec = errors::no_metadata;
			return ret_t{ptr_t(), params.info_hashes, false};
		}

		if (params.ti && params.ti->is_valid() && params.ti->num_files() == 0)
		{
			ec = errors::no_files_in_torrent;
			return ret_t{ptr_t(), params.info_hashes, false};
		}

		if (params.ti
			&& ((params.info_hashes.has_v1() && params.info_hashes.v1 != params.ti->info_hashes().v1)
				|| (params.info_hashes.has_v2() && params.info_hashes.v2 != params.ti->info_hashes().v2)
			))
		{
			ec = errors::mismatching_info_hash;
			return ret_t{ptr_t(), params.info_hashes, false};
		}

#ifndef TORRENT_DISABLE_DHT
		// add params.dht_nodes to the DHT, if enabled
		for (auto const& n : params.dht_nodes)
			add_dht_node_name(n);

		if (params.ti)
		{
			for (auto const& n : params.ti->nodes())
				add_dht_node_name(n);
		}
#endif

		INVARIANT_CHECK;

		if (is_aborted())
		{
			ec = errors::session_is_closing;
			return ret_t{ptr_t(), params.info_hashes, false};
		}

		// figure out the info hash of the torrent and make sure
		// params.info_hashes is set correctly
		if (params.ti)
		{
			params.info_hashes = params.ti->info_hashes();
#if TORRENT_ABI_VERSION < 3
			params.info_hash = params.info_hashes.get_best();
#endif
		}

		if (!params.info_hashes.has_v1() && !params.info_hashes.has_v2())
		{
			ec = errors::missing_info_hash_in_uri;
			return ret_t{ptr_t(), params.info_hashes, false};
		}

		// is the torrent already active?
		std::shared_ptr<torrent> torrent_ptr = find_torrent(params.info_hashes).lock();

		if (torrent_ptr)
		{
			if (!(params.flags & torrent_flags::duplicate_is_error))
				return ret_t{std::move(torrent_ptr), params.info_hashes, false};

			ec = errors::duplicate_torrent;
			return ret_t{ptr_t(), params.info_hashes, false};
		}

		// make sure we have enough memory in the torrent lists up-front,
		// since when torrents changes states, we cannot allocate memory that
		// might fail.
		size_t const num_torrents = m_torrents.size();
		for (auto& l : m_torrent_lists)
		{
			l.reserve(num_torrents + 1);
		}

		torrent_ptr = std::make_shared<torrent>(*this, m_paused, std::move(params));
		torrent_ptr->set_queue_position(m_download_queue.end_index());

		// it's fine to copy this moved-from info_hash_t object, since its move
		// construction is just a copy.
		return ret_t{std::move(torrent_ptr), params.info_hashes, true};
	}

	void session_impl::update_outgoing_interfaces()
	{
		std::string const net_interfaces = m_settings.get_str(settings_pack::outgoing_interfaces);

		// declared in string_util.hpp
		parse_comma_separated_string(net_interfaces, m_outgoing_interfaces);

#ifndef TORRENT_DISABLE_LOGGING
		if (!net_interfaces.empty() && m_outgoing_interfaces.empty())
		{
			session_log("ERROR: failed to parse outgoing interface list: %s"
				, net_interfaces.c_str());
		}
#endif
	}

	tcp::endpoint session_impl::bind_outgoing_socket(socket_type& s
		, address const& remote_address, error_code& ec) const
	{
		tcp::endpoint bind_ep(address_v4(), 0);
		if (m_settings.get_int(settings_pack::outgoing_port) > 0)
		{
#ifdef TORRENT_WINDOWS
			s.set_option(exclusive_address_use(true), ec);
#else
			s.set_option(tcp::acceptor::reuse_address(true), ec);
#endif
			// ignore errors because the underlying socket may not
			// be opened yet. This happens when we're routing through
			// a proxy. In that case, we don't yet know the address of
			// the proxy server, and more importantly, we don't know
			// the address family of its address. This means we can't
			// open the socket yet. The socks abstraction layer defers
			// opening it.
			ec.clear();
			bind_ep.port(std::uint16_t(next_port()));
		}

		if (is_utp(s))
		{
			// TODO: factor out this logic into a separate function for unit
			// testing

			utp_socket_impl* impl = nullptr;
			transport ssl = transport::plaintext;
#if TORRENT_USE_SSL
			if (std::get_if<ssl_stream<utp_stream>>(&s.var()) != nullptr)
			{
				impl = std::get<ssl_stream<utp_stream>>(s).next_layer().get_impl();
				ssl = transport::ssl;
			}
			else
#endif
				impl = std::get<utp_stream>(s).get_impl();

			std::vector<std::shared_ptr<listen_socket_t>> with_gateways;
			std::shared_ptr<listen_socket_t> match;
			for (auto const& ls : m_listen_sockets)
			{
				if (is_v4(ls->local_endpoint) != remote_address.is_v4()) continue;
				if (ls->ssl != ssl) continue;
				if (!(ls->flags & listen_socket_t::local_network))
					with_gateways.push_back(ls);

				if (match_addr_mask(ls->local_endpoint.address(), remote_address, ls->netmask))
				{
					// is this better than the previous match?
					match = ls;
				}
			}
			if (!match && !with_gateways.empty())
				match = with_gateways[random(std::uint32_t(with_gateways.size() - 1))];

			if (match)
			{
				impl->m_sock = match;
				return match->local_endpoint;
			}
			ec.assign(boost::system::errc::not_supported, generic_category());
			return {};
		}

		if (!m_outgoing_interfaces.empty())
		{
			if (m_interface_index >= m_outgoing_interfaces.size()) m_interface_index = 0;
			std::string const& ifname = m_outgoing_interfaces[m_interface_index++];

			bind_ep.address(bind_socket_to_device(m_io_context, s
				, remote_address.is_v4() ? tcp::v4() : tcp::v6()
				, ifname.c_str(), bind_ep.port(), ec));
			return bind_ep;
		}

		// if we're not binding to a specific interface, bind
		// to the same protocol family as the target endpoint
		if (bind_ep.address().is_unspecified())
		{
			if (remote_address.is_v6())
				bind_ep.address(address_v6::any());
			else
				bind_ep.address(address_v4::any());
		}

		s.bind(bind_ep, ec);
		return bind_ep;
	}

	// verify that ``addr``s interface allows incoming connections
	bool session_impl::verify_incoming_interface(address const& addr)
	{
		auto const iter = std::find_if(m_listen_sockets.begin(), m_listen_sockets.end()
			, [&addr](std::shared_ptr<listen_socket_t> const& s)
			{ return s->local_endpoint.address() == addr; });
		return iter == m_listen_sockets.end()
			? false
			: bool((*iter)->flags & listen_socket_t::accept_incoming);
	}

	// verify that the given local address satisfies the requirements of
	// the outgoing interfaces. i.e. that one of the allowed outgoing
	// interfaces has this address. For uTP sockets, which are all backed
	// by an unconnected udp socket, we won't be able to tell what local
	// address is used for this peer's packets, in that case, just make
	// sure one of the allowed interfaces exists and maybe that it's the
	// default route. For systems that have SO_BINDTODEVICE, it should be
	// enough to just know that one of the devices exist
	bool session_impl::verify_bound_address(address const& addr, bool utp
		, error_code& ec)
	{
		TORRENT_UNUSED(utp);

		// we have specific outgoing interfaces specified. Make sure the
		// local endpoint for this socket is bound to one of the allowed
		// interfaces. the list can be a mixture of interfaces and IP
		// addresses.
		for (auto const& s : m_outgoing_interfaces)
		{
			error_code err;
			address const ip = make_address(s.c_str(), err);
			if (err) continue;
			if (ip == addr) return true;
		}

		// we didn't find the address as an IP in the interface list. Now,
		// resolve which device (if any) has this IP address.
		std::string const device = device_for_address(addr, m_io_context, ec);
		if (ec) return false;

		// if no device was found to have this address, we fail
		if (device.empty()) return false;

		return std::any_of(m_outgoing_interfaces.begin(), m_outgoing_interfaces.end()
			, [&device](std::string const& s) { return s == device; });
	}

	bool session_impl::has_lsd() const
	{
		return std::any_of(m_listen_sockets.begin(), m_listen_sockets.end()
			, [](std::shared_ptr<listen_socket_t> const& s) { return bool(s->lsd); });
	}

	void session_impl::remove_torrent(const torrent_handle& h
		, remove_flags_t const options)
	{
		INVARIANT_CHECK;

		std::shared_ptr<torrent> tptr = h.m_torrent.lock();
		if (!tptr) return;

		m_alerts.emplace_alert<torrent_removed_alert>(tptr->get_handle()
			, tptr->info_hash(), tptr->get_userdata());

		remove_torrent_impl(tptr, options);

		tptr->abort();
	}

	void session_impl::remove_torrent_impl(std::shared_ptr<torrent> tptr
		, remove_flags_t const options)
	{
		m_torrents.erase(tptr->torrent_file().info_hashes());

		torrent& t = *tptr;
		if (options)
		{
			if (!t.delete_files(options))
			{
				if (m_alerts.should_post<torrent_delete_failed_alert>())
					m_alerts.emplace_alert<torrent_delete_failed_alert>(t.get_handle()
						, error_code(), t.torrent_file().info_hashes());
			}
		}

		tptr->update_gauge();
		tptr->removed();

#ifndef TORRENT_DISABLE_DHT
		if (m_next_dht_torrent == m_torrents.size())
			m_next_dht_torrent = 0;
#endif
		if (m_next_lsd_torrent == m_torrents.size())
			m_next_lsd_torrent = 0;

		// this torrent may open up a slot for a queued torrent
		trigger_auto_manage();
	}

#if TORRENT_ABI_VERSION == 1

	void session_impl::update_ssl_listen()
	{
		INVARIANT_CHECK;

		// this function maps the previous functionality of just setting the ssl
		// listen port in order to enable the ssl listen sockets, to the new
		// mechanism where SSL sockets are specified in listen_interfaces.
		std::vector<std::string> ignore;
		auto current_ifaces = parse_listen_interfaces(
			m_settings.get_str(settings_pack::listen_interfaces), ignore);
		// these are the current interfaces we have, first remove all the SSL
		// interfaces
		current_ifaces.erase(std::remove_if(current_ifaces.begin(), current_ifaces.end()
			, std::bind(&listen_interface_t::ssl, _1)), current_ifaces.end());

		int const ssl_listen_port = m_settings.get_int(settings_pack::ssl_listen);

		// setting a port of 0 means to disable listening on SSL, so just update
		// the interface list with the new list, and we're done
		if (ssl_listen_port == 0)
		{
			m_settings.set_str(settings_pack::listen_interfaces
				, print_listen_interfaces(current_ifaces));
			return;
		}

		std::vector<listen_interface_t> new_ifaces;
		std::transform(current_ifaces.begin(), current_ifaces.end()
			, std::back_inserter(new_ifaces), [](listen_interface_t in)
			{ in.ssl = true; return in; });

		current_ifaces.insert(current_ifaces.end(), new_ifaces.begin(), new_ifaces.end());

		m_settings.set_str(settings_pack::listen_interfaces
			, print_listen_interfaces(current_ifaces));
	}
#endif // TORRENT_ABI_VERSION

	void session_impl::update_listen_interfaces()
	{
		INVARIANT_CHECK;

		std::string const net_interfaces = m_settings.get_str(settings_pack::listen_interfaces);
		std::vector<std::string> err;
		m_listen_interfaces = parse_listen_interfaces(net_interfaces, err);

		for (auto const& e : err)
		{
			m_alerts.emplace_alert<listen_failed_alert>(e, lt::address{}, 0
				, operation_t::parse_address, errors::invalid_port, lt::socket_type_t::tcp);
		}

#ifndef TORRENT_DISABLE_LOGGING
		if (should_log())
		{
			session_log("update listen interfaces: %s", net_interfaces.c_str());
			session_log("parsed listen interfaces count: %d, ifaces: %s"
				, int(m_listen_interfaces.size())
				, print_listen_interfaces(m_listen_interfaces).c_str());
		}
#endif
	}

	void session_impl::update_privileged_ports()
	{
		if (m_settings.get_bool(settings_pack::no_connect_privileged_ports))
		{
			m_port_filter.add_rule(0, 1024, port_filter::blocked);

			// Close connections whose endpoint is filtered
			// by the new ip-filter
			for (auto const& t : m_torrents)
				t->port_filter_updated();
		}
		else
		{
			m_port_filter.add_rule(0, 1024, 0);
		}
	}

	void session_impl::update_auto_sequential()
	{
		for (auto& i : m_torrents)
			i->update_auto_sequential();
	}

	void session_impl::update_max_failcount()
	{
		for (auto& i : m_torrents)
			i->update_max_failcount();
	}

	void session_impl::update_resolver_cache_timeout()
	{
		int const timeout = m_settings.get_int(settings_pack::resolver_cache_timeout);
		m_host_resolver.set_cache_timeout(seconds(timeout));
	}

	void session_impl::update_proxy()
	{
		for (auto& i : m_listen_sockets)
			i->udp_sock->sock.set_proxy_settings(proxy(), m_alerts);
	}

	void session_impl::update_ip_notifier()
	{
		if (m_settings.get_bool(settings_pack::enable_ip_notifier))
			start_ip_notifier();
		else
			stop_ip_notifier();
	}

	void session_impl::update_upnp()
	{
		if (m_settings.get_bool(settings_pack::enable_upnp))
			start_upnp();
		else
			stop_upnp();
	}

	void session_impl::update_natpmp()
	{
		if (m_settings.get_bool(settings_pack::enable_natpmp))
			start_natpmp();
		else
			stop_natpmp();
	}

	void session_impl::update_lsd()
	{
		if (m_settings.get_bool(settings_pack::enable_lsd))
			start_lsd();
		else
			stop_lsd();
	}

	void session_impl::update_dht()
	{
#ifndef TORRENT_DISABLE_DHT
		if (m_settings.get_bool(settings_pack::enable_dht))
		{
			if (!m_settings.get_str(settings_pack::dht_bootstrap_nodes).empty()
				&& m_dht_router_nodes.empty())
			{
				// if we have bootstrap nodes configured, make sure we initiate host
				// name lookups. once these complete, the DHT will be started.
				// they are tracked by m_outstanding_router_lookups
				update_dht_bootstrap_nodes();
			}
			else
			{
				start_dht();
			}
		}
		else
			stop_dht();
#endif
	}

	void session_impl::update_dht_bootstrap_nodes()
	{
#ifndef TORRENT_DISABLE_DHT
		if (!m_settings.get_bool(settings_pack::enable_dht)) return;

		std::string const& node_list = m_settings.get_str(settings_pack::dht_bootstrap_nodes);
		std::vector<std::pair<std::string, int>> nodes;
		parse_comma_separated_string_port(node_list, nodes);

#ifndef TORRENT_DISABLE_LOGGING
		if (!node_list.empty() && nodes.empty())
		{
			session_log("ERROR: failed to parse DHT bootstrap list: %s", node_list.c_str());
		}
#endif
		for (auto const& n : nodes)
			add_dht_router(n);
#endif
	}

	void session_impl::update_count_slow()
	{
		error_code ec;
		for (auto const& tp : m_torrents)
		{
			tp->on_inactivity_tick(ec);
		}
	}

	// TODO: 2 this function should be removed and users need to deal with the
	// more generic case of having multiple listen ports
	std::uint16_t session_impl::listen_port() const
	{
		return listen_port(nullptr);
	}

	std::uint16_t session_impl::listen_port(listen_socket_t* sock) const
	{
		if (m_listen_sockets.empty()) return 0;
		if (sock)
		{
			// if we're using a proxy, we won't be able to accept any TCP
			// connections. Not even uTP connections via the port we know about.
			// The DHT may use the implied port to make it work, but the port we
			// announce here has no relevance for that.
			if (sock->flags & listen_socket_t::proxy)
				return 0;

			if (!(sock->flags & listen_socket_t::accept_incoming))
				return 0;

			return std::uint16_t(sock->tcp_external_port());
		}

#ifdef TORRENT_SSL_PEERS
		for (auto const& s : m_listen_sockets)
		{
			if (!(s->flags & listen_socket_t::accept_incoming)) continue;
			if (s->ssl == transport::plaintext)
				return std::uint16_t(s->tcp_external_port());
		}
		return 0;
#else
		sock = m_listen_sockets.front().get();
		if (!(sock->flags & listen_socket_t::accept_incoming)) return 0;
		return std::uint16_t(sock->tcp_external_port());
#endif
	}

	// TODO: 2 this function should be removed and users need to deal with the
	// more generic case of having multiple ssl ports
	std::uint16_t session_impl::ssl_listen_port() const
	{
		return ssl_listen_port(nullptr);
	}

	std::uint16_t session_impl::ssl_listen_port(listen_socket_t* sock) const
	{
#ifdef TORRENT_SSL_PEERS
		if (sock)
		{
			if (!(sock->flags & listen_socket_t::accept_incoming)) return 0;
			return std::uint16_t(sock->tcp_external_port());
		}

		if (m_settings.get_int(settings_pack::proxy_type) != settings_pack::none)
			return 0;

		for (auto const& s : m_listen_sockets)
		{
			if (!(s->flags & listen_socket_t::accept_incoming)) continue;
			if (s->ssl == transport::ssl)
				return std::uint16_t(s->tcp_external_port());
		}
#else
		TORRENT_UNUSED(sock);
#endif
		return 0;
	}

	int session_impl::get_listen_port(transport const ssl, aux::listen_socket_handle const& s)
	{
		auto* socket = s.get();
		if (socket->ssl != ssl)
		{
			auto alt_socket = std::find_if(m_listen_sockets.begin(), m_listen_sockets.end()
				, [&](std::shared_ptr<listen_socket_t> const& e)
			{
				return e->ssl == ssl
					&& e->external_address.external_address()
						== socket->external_address.external_address();
			});
			if (alt_socket != m_listen_sockets.end())
				socket = alt_socket->get();
		}
		return socket->udp_external_port();
	}

	int session_impl::listen_port(transport const ssl, address const& local_addr)
	{
		auto socket = std::find_if(m_listen_sockets.begin(), m_listen_sockets.end()
			, [&](std::shared_ptr<listen_socket_t> const& e)
		{
			if (!(e->flags & listen_socket_t::accept_incoming)) return false;
			auto const& listen_addr = e->external_address.external_address();
			return e->ssl == ssl
				&& (listen_addr == local_addr
					|| (listen_addr.is_v4() == local_addr.is_v4() && listen_addr.is_unspecified()));
		});
		if (socket != m_listen_sockets.end())
			return (*socket)->tcp_external_port();
		return 0;
	}

	void session_impl::announce_lsd(sha1_hash const& ih, int port)
	{
		// use internal listen port for local peers
		for (auto const& s : m_listen_sockets)
		{
			if (s->lsd) s->lsd->announce(ih, port);
		}
	}

	void session_impl::on_lsd_peer(tcp::endpoint const& peer, sha1_hash const& ih)
	{
		m_stats_counters.inc_stats_counter(counters::on_lsd_peer_counter);
		TORRENT_ASSERT(is_single_thread());

		INVARIANT_CHECK;

		std::shared_ptr<torrent> t = find_torrent(info_hash_t(ih)).lock();
		if (!t) return;
		// don't add peers from lsd to private torrents
		if (t->torrent_file().priv() || (t->torrent_file().is_i2p()
			&& !m_settings.get_bool(settings_pack::allow_i2p_mixed))) return;

		protocol_version const v = ih == t->torrent_file().info_hashes().v1
			? protocol_version::V1 : protocol_version::V2;

		t->add_peer(peer, peer_info::lsd, v == protocol_version::V2 ? pex_lt_v2 : pex_flags_t(0));
#ifndef TORRENT_DISABLE_LOGGING
		if (should_log())
		{
			t->debug_log("lsd add_peer() [ %s ]"
				, peer.address().to_string().c_str());
		}
#endif
		t->do_connect_boost();

		if (m_alerts.should_post<lsd_peer_alert>())
			m_alerts.emplace_alert<lsd_peer_alert>(t->get_handle(), peer);
	}

	void session_impl::start_natpmp(std::shared_ptr<aux::listen_socket_t> const& s)
	{
		// don't create mappings for local IPv6 addresses
		// they can't be reached from outside of the local network anyways
		if (is_v6(s->local_endpoint) && is_local(s->local_endpoint.address()))
			return;

		if (!s->natpmp_mapper
			&& !(s->flags & listen_socket_t::local_network)
			&& !(s->flags & listen_socket_t::proxy))
		{
			// the natpmp constructor may fail and call the callbacks
			// into the session_impl.
			s->natpmp_mapper = std::make_shared<natpmp>(m_io_context, *this, listen_socket_handle(s));
			ip_interface ip;
			ip.interface_address = s->local_endpoint.address();
			ip.netmask = s->netmask;
			std::strncpy(ip.name, s->device.c_str(), sizeof(ip.name) - 1);
			ip.name[sizeof(ip.name) - 1] = '\0';
			s->natpmp_mapper->start(ip);
		}
	}

	void session_impl::on_port_mapping(port_mapping_t const mapping
		, address const& external_ip, int port
		, portmap_protocol const proto, error_code const& ec
		, portmap_transport const transport
		, listen_socket_handle const& ls)
	{
		TORRENT_ASSERT(is_single_thread());

		listen_socket_t* listen_socket = ls.get();

		// NOTE: don't assume that if ec != 0, the rest of the logic
		// is not necessary, the ports still need to be set, in other
		// words, don't early return without careful review of the
		// remaining logic
		if (ec && m_alerts.should_post<portmap_error_alert>())
		{
			m_alerts.emplace_alert<portmap_error_alert>(mapping
				, transport, ec, listen_socket ? listen_socket->local_endpoint.address() : address());
		}

		if (!listen_socket) return;

		if (!ec && !external_ip.is_unspecified())
		{
			// TODO: 1 report the proper address of the router as the source IP of
			// this vote of our external address, instead of the empty address
			listen_socket->external_address.cast_vote(external_ip, source_router, address());
		}

		if (proto == portmap_protocol::tcp) listen_socket->tcp_port_mapping[transport].port = port;
		else if (proto == portmap_protocol::udp) listen_socket->udp_port_mapping[transport].port = port;

		if (!ec && m_alerts.should_post<portmap_alert>())
		{
			m_alerts.emplace_alert<portmap_alert>(mapping, port
				, transport, proto, listen_socket->local_endpoint.address());
		}
	}

#if TORRENT_ABI_VERSION == 1
	session_status session_impl::status() const
	{
//		INVARIANT_CHECK;
		TORRENT_ASSERT(is_single_thread());

		session_status s;

		s.optimistic_unchoke_counter = m_optimistic_unchoke_time_scaler;
		s.unchoke_counter = m_unchoke_time_scaler;
		s.num_dead_peers = int(m_undead_peers.size());

		s.num_peers = int(m_stats_counters[counters::num_peers_connected]);
		s.num_unchoked = int(m_stats_counters[counters::num_peers_up_unchoked_all]);
		s.allowed_upload_slots = int(m_stats_counters[counters::num_unchoke_slots]);

		s.num_torrents
			= int(m_stats_counters[counters::num_checking_torrents]
			+ m_stats_counters[counters::num_stopped_torrents]
			+ m_stats_counters[counters::num_queued_seeding_torrents]
			+ m_stats_counters[counters::num_queued_download_torrents]
			+ m_stats_counters[counters::num_upload_only_torrents]
			+ m_stats_counters[counters::num_downloading_torrents]
			+ m_stats_counters[counters::num_seeding_torrents]
			+ m_stats_counters[counters::num_error_torrents]);

		s.num_paused_torrents
			= int(m_stats_counters[counters::num_stopped_torrents]
			+ m_stats_counters[counters::num_error_torrents]
			+ m_stats_counters[counters::num_queued_seeding_torrents]
			+ m_stats_counters[counters::num_queued_download_torrents]);

		s.total_redundant_bytes = m_stats_counters[counters::recv_redundant_bytes];
		s.total_failed_bytes = m_stats_counters[counters::recv_failed_bytes];

		s.up_bandwidth_queue = int(m_stats_counters[counters::limiter_up_queue]);
		s.down_bandwidth_queue = int(m_stats_counters[counters::limiter_down_queue]);

		s.up_bandwidth_bytes_queue = int(m_stats_counters[counters::limiter_up_bytes]);
		s.down_bandwidth_bytes_queue = int(m_stats_counters[counters::limiter_down_bytes]);

		s.disk_write_queue = int(m_stats_counters[counters::num_peers_down_disk]);
		s.disk_read_queue = int(m_stats_counters[counters::num_peers_up_disk]);

		s.has_incoming_connections = m_stats_counters[counters::has_incoming_connections] != 0;

		// total
		s.download_rate = m_stat.download_rate();
		s.total_upload = m_stat.total_upload();
		s.upload_rate = m_stat.upload_rate();
		s.total_download = m_stat.total_download();

		// payload
		s.payload_download_rate = m_stat.transfer_rate(stat::download_payload);
		s.total_payload_download = m_stat.total_transfer(stat::download_payload);
		s.payload_upload_rate = m_stat.transfer_rate(stat::upload_payload);
		s.total_payload_upload = m_stat.total_transfer(stat::upload_payload);

		// IP-overhead
		s.ip_overhead_download_rate = m_stat.transfer_rate(stat::download_ip_protocol);
		s.total_ip_overhead_download = m_stats_counters[counters::recv_ip_overhead_bytes];
		s.ip_overhead_upload_rate = m_stat.transfer_rate(stat::upload_ip_protocol);
		s.total_ip_overhead_upload = m_stats_counters[counters::sent_ip_overhead_bytes];

		// tracker
		s.total_tracker_download = m_stats_counters[counters::recv_tracker_bytes];
		s.total_tracker_upload = m_stats_counters[counters::sent_tracker_bytes];

		// dht
		s.total_dht_download = m_stats_counters[counters::dht_bytes_in];
		s.total_dht_upload = m_stats_counters[counters::dht_bytes_out];

		// deprecated
		s.tracker_download_rate = 0;
		s.tracker_upload_rate = 0;
		s.dht_download_rate = 0;
		s.dht_upload_rate = 0;

#ifndef TORRENT_DISABLE_DHT
		if (m_dht)
		{
			m_dht->dht_status(s);
		}
		else
#endif
		{
			s.dht_nodes = 0;
			s.dht_node_cache = 0;
			s.dht_torrents = 0;
			s.dht_global_nodes = 0;
			s.dht_total_allocations = 0;
		}

		s.utp_stats.packet_loss = std::uint64_t(m_stats_counters[counters::utp_packet_loss]);
		s.utp_stats.timeout = std::uint64_t(m_stats_counters[counters::utp_timeout]);
		s.utp_stats.packets_in = std::uint64_t(m_stats_counters[counters::utp_packets_in]);
		s.utp_stats.packets_out = std::uint64_t(m_stats_counters[counters::utp_packets_out]);
		s.utp_stats.fast_retransmit = std::uint64_t(m_stats_counters[counters::utp_fast_retransmit]);
		s.utp_stats.packet_resend = std::uint64_t(m_stats_counters[counters::utp_packet_resend]);
		s.utp_stats.samples_above_target = std::uint64_t(m_stats_counters[counters::utp_samples_above_target]);
		s.utp_stats.samples_below_target = std::uint64_t(m_stats_counters[counters::utp_samples_below_target]);
		s.utp_stats.payload_pkts_in = std::uint64_t(m_stats_counters[counters::utp_payload_pkts_in]);
		s.utp_stats.payload_pkts_out = std::uint64_t(m_stats_counters[counters::utp_payload_pkts_out]);
		s.utp_stats.invalid_pkts_in = std::uint64_t(m_stats_counters[counters::utp_invalid_pkts_in]);
		s.utp_stats.redundant_pkts_in = std::uint64_t(m_stats_counters[counters::utp_redundant_pkts_in]);

		s.utp_stats.num_idle = int(m_stats_counters[counters::num_utp_idle]);
		s.utp_stats.num_syn_sent = int(m_stats_counters[counters::num_utp_syn_sent]);
		s.utp_stats.num_connected = int(m_stats_counters[counters::num_utp_connected]);
		s.utp_stats.num_fin_sent = int(m_stats_counters[counters::num_utp_fin_sent]);
		s.utp_stats.num_close_wait = int(m_stats_counters[counters::num_utp_close_wait]);

		// this loop is potentially expensive. It could be optimized by
		// simply keeping a global counter
		s.peerlist_size = std::accumulate(m_torrents.begin(), m_torrents.end(), 0
			, [](int const acc, std::shared_ptr<torrent> const& t)
			{ return acc + t->num_known_peers(); });

		return s;
	}
#endif // TORRENT_ABI_VERSION

#ifndef TORRENT_DISABLE_DHT

	void session_impl::start_dht()
	{
		INVARIANT_CHECK;

		stop_dht();

		if (!m_settings.get_bool(settings_pack::enable_dht)) return;

		// postpone starting the DHT if we're still resolving the DHT router
		if (m_outstanding_router_lookups > 0)
		{
#ifndef TORRENT_DISABLE_LOGGING
			session_log("not starting DHT, outstanding router lookups: %d"
				, m_outstanding_router_lookups);
#endif
			return;
		}

		if (m_abort)
		{
#ifndef TORRENT_DISABLE_LOGGING
			session_log("not starting DHT, aborting");
#endif
			return;
		}

#ifndef TORRENT_DISABLE_LOGGING
		session_log("starting DHT, running: %s, router lookups: %d"
			, m_dht ? "true" : "false", m_outstanding_router_lookups);
#endif

		// TODO: refactor, move the storage to dht_tracker
		m_dht_storage = m_dht_storage_constructor(m_settings);
		m_dht = std::make_shared<dht::dht_tracker>(
			static_cast<dht::dht_observer*>(this)
			, m_io_context
			, [this](aux::listen_socket_handle const& sock
				, udp::endpoint const& ep
				, span<char const> p
				, error_code& ec
				, udp_send_flags_t const flags)
				{ send_udp_packet_listen(sock, ep, p, ec, flags); }
			, m_settings
			, m_stats_counters
			, *m_dht_storage
			, std::move(m_dht_state));

		for (auto& s : m_listen_sockets)
		{
			if (s->ssl != transport::ssl
				&& !(s->flags & listen_socket_t::local_network))
			{
				m_dht->new_socket(s);
			}
		}

		for (auto const& n : m_dht_router_nodes)
		{
			m_dht->add_router_node(n);
		}

		for (auto const& n : m_dht_nodes)
		{
			m_dht->add_node(n);
		}
		m_dht_nodes.clear();
		m_dht_nodes.shrink_to_fit();

		auto cb = [this](
			std::vector<std::pair<dht::node_entry, std::string>> const&)
		{
			if (m_alerts.should_post<dht_bootstrap_alert>())
				m_alerts.emplace_alert<dht_bootstrap_alert>();
		};

		m_dht->start(cb);
	}

	void session_impl::stop_dht()
	{
#ifndef TORRENT_DISABLE_LOGGING
		session_log("about to stop DHT, running: %s", m_dht ? "true" : "false");
#endif

		if (m_dht)
		{
			m_dht->stop();
			m_dht.reset();
		}

		m_dht_storage.reset();
	}

#if TORRENT_ABI_VERSION <= 2
	void session_impl::set_dht_settings(dht::dht_settings const& settings)
	{
#define SET_BOOL(name) m_settings.set_bool(settings_pack::dht_ ## name, settings.name)
#define SET_INT(name) m_settings.set_int(settings_pack::dht_ ## name, settings.name)

		SET_INT(max_peers_reply);
		SET_INT(search_branching);
		SET_INT(max_fail_count);
		SET_INT(max_torrents);
		SET_INT(max_dht_items);
		SET_INT(max_peers);
		SET_INT(max_torrent_search_reply);
		SET_BOOL(restrict_routing_ips);
		SET_BOOL(restrict_search_ips);
		SET_BOOL(extended_routing_table);
		SET_BOOL(aggressive_lookups);
		SET_BOOL(privacy_lookups);
		SET_BOOL(enforce_node_id);
		SET_BOOL(ignore_dark_internet);
		SET_INT(block_timeout);
		SET_INT(block_ratelimit);
		SET_BOOL(read_only);
		SET_INT(item_lifetime);
		SET_INT(upload_rate_limit);
		SET_INT(sample_infohashes_interval);
		SET_INT(max_infohashes_sample_count);
#undef SET_BOOL
#undef SET_INT
		update_dht_upload_rate_limit();
	}

	dht::dht_settings session_impl::get_dht_settings() const
	{
		dht::dht_settings sett;
#define SET_BOOL(name) \
		sett.name = m_settings.get_bool( settings_pack::dht_ ## name )
#define SET_INT(name) \
		sett.name = m_settings.get_int( settings_pack::dht_ ## name )

		SET_INT(max_peers_reply);
		SET_INT(search_branching);
		SET_INT(max_fail_count);
		SET_INT(max_torrents);
		SET_INT(max_dht_items);
		SET_INT(max_peers);
		SET_INT(max_torrent_search_reply);
		SET_BOOL(restrict_routing_ips);
		SET_BOOL(restrict_search_ips);
		SET_BOOL(extended_routing_table);
		SET_BOOL(aggressive_lookups);
		SET_BOOL(privacy_lookups);
		SET_BOOL(enforce_node_id);
		SET_BOOL(ignore_dark_internet);
		SET_INT(block_timeout);
		SET_INT(block_ratelimit);
		SET_BOOL(read_only);
		SET_INT(item_lifetime);
		SET_INT(upload_rate_limit);
		SET_INT(sample_infohashes_interval);
		SET_INT(max_infohashes_sample_count);
#undef SET_BOOL
#undef SET_INT
		return sett;
	}
#endif

	void session_impl::set_dht_state(dht::dht_state&& state)
	{
		m_dht_state = std::move(state);
	}

	void session_impl::set_dht_storage(dht::dht_storage_constructor_type sc)
	{
		m_dht_storage_constructor = std::move(sc);
	}

#if TORRENT_ABI_VERSION == 1
	entry session_impl::dht_state() const
	{
		return m_dht ? dht::save_dht_state(m_dht->state()) : entry();
	}

	void session_impl::start_dht_deprecated(entry const& startup_state)
	{
		m_settings.set_bool(settings_pack::enable_dht, true);
		std::vector<char> tmp;
		bencode(std::back_inserter(tmp), startup_state);

		bdecode_node e;
		error_code ec;
		if (tmp.empty() || bdecode(&tmp[0], &tmp[0] + tmp.size(), e, ec) != 0)
			return;
		m_dht_state = dht::read_dht_state(e);
		start_dht();
	}
#endif

	void session_impl::add_dht_node_name(std::pair<std::string, int> const& node)
	{
		ADD_OUTSTANDING_ASYNC("session_impl::on_dht_name_lookup");
		m_host_resolver.async_resolve(node.first, resolver::abort_on_shutdown
			, std::bind(&session_impl::on_dht_name_lookup
				, this, _1, _2, node.second));
	}

	void session_impl::on_dht_name_lookup(error_code const& e
		, std::vector<address> const& addresses, int port)
	{
		COMPLETE_ASYNC("session_impl::on_dht_name_lookup");

		if (e)
		{
			if (m_alerts.should_post<dht_error_alert>())
				m_alerts.emplace_alert<dht_error_alert>(
					operation_t::hostname_lookup, e);
			return;
		}

		for (auto const& addr : addresses)
		{
			udp::endpoint ep(addr, std::uint16_t(port));
			add_dht_node(ep);
		}
	}

	void session_impl::add_dht_router(std::pair<std::string, int> const& node)
	{
		ADD_OUTSTANDING_ASYNC("session_impl::on_dht_router_name_lookup");
		++m_outstanding_router_lookups;
		m_host_resolver.async_resolve(node.first, resolver::abort_on_shutdown
			, std::bind(&session_impl::on_dht_router_name_lookup
				, this, _1, _2, node.second));
	}

	void session_impl::on_dht_router_name_lookup(error_code const& e
		, std::vector<address> const& addresses, int port)
	{
		COMPLETE_ASYNC("session_impl::on_dht_router_name_lookup");
		--m_outstanding_router_lookups;

		if (e)
		{
			if (m_alerts.should_post<dht_error_alert>())
				m_alerts.emplace_alert<dht_error_alert>(
					operation_t::hostname_lookup, e);

			if (m_outstanding_router_lookups == 0) start_dht();
			return;
		}


		for (auto const& addr : addresses)
		{
			// router nodes should be added before the DHT is started (and bootstrapped)
			udp::endpoint ep(addr, std::uint16_t(port));
			if (m_dht) m_dht->add_router_node(ep);
			m_dht_router_nodes.push_back(ep);
		}

		if (m_outstanding_router_lookups == 0) start_dht();
	}

	// callback for dht_immutable_get
	void session_impl::get_immutable_callback(sha1_hash target
		, dht::item const& i)
	{
		TORRENT_ASSERT(!i.is_mutable());
		m_alerts.emplace_alert<dht_immutable_item_alert>(target, i.value());
	}

	void session_impl::dht_get_immutable_item(sha1_hash const& target)
	{
		if (!m_dht) return;
		m_dht->get_item(target, std::bind(&session_impl::get_immutable_callback
			, this, target, _1));
	}

	// callback for dht_mutable_get
	void session_impl::get_mutable_callback(dht::item const& i
		, bool const authoritative)
	{
		TORRENT_ASSERT(i.is_mutable());
		m_alerts.emplace_alert<dht_mutable_item_alert>(i.pk().bytes
			, i.sig().bytes, i.seq().value
			, i.salt(), i.value(), authoritative);
	}

	// key is a 32-byte binary string, the public key to look up.
	// the salt is optional
	// TODO: 3 use public_key here instead of std::array
	void session_impl::dht_get_mutable_item(std::array<char, 32> key
		, std::string salt)
	{
		if (!m_dht) return;
		m_dht->get_item(dht::public_key(key.data()), std::bind(&session_impl::get_mutable_callback
			, this, _1, _2), std::move(salt));
	}

	namespace {

		void on_dht_put_immutable_item(aux::alert_manager& alerts, sha1_hash target, int num)
		{
			if (alerts.should_post<dht_put_alert>())
				alerts.emplace_alert<dht_put_alert>(target, num);
		}

		void on_dht_put_mutable_item(aux::alert_manager& alerts, dht::item const& i, int num)
		{
			if (alerts.should_post<dht_put_alert>())
			{
				dht::signature const sig = i.sig();
				dht::public_key const pk = i.pk();
				dht::sequence_number const seq = i.seq();
				std::string salt = i.salt();
				alerts.emplace_alert<dht_put_alert>(pk.bytes, sig.bytes
					, std::move(salt), seq.value, num);
			}
		}

		void put_mutable_callback(dht::item& i
			, std::function<void(entry&, std::array<char, 64>&
				, std::int64_t&, std::string const&)> cb)
		{
			entry value = i.value();
			dht::signature sig = i.sig();
			dht::public_key pk = i.pk();
			dht::sequence_number seq = i.seq();
			std::string salt = i.salt();
			cb(value, sig.bytes, seq.value, salt);
			i.assign(std::move(value), salt, seq, pk, sig);
		}

		void on_dht_get_peers(aux::alert_manager& alerts, sha1_hash info_hash, std::vector<tcp::endpoint> const& peers)
		{
			if (alerts.should_post<dht_get_peers_reply_alert>())
				alerts.emplace_alert<dht_get_peers_reply_alert>(info_hash, peers);
		}

		void on_direct_response(aux::alert_manager& alerts, client_data_t userdata, dht::msg const& msg)
		{
			if (msg.message.type() == bdecode_node::none_t)
				alerts.emplace_alert<dht_direct_response_alert>(userdata, msg.addr);
			else
				alerts.emplace_alert<dht_direct_response_alert>(userdata, msg.addr, msg.message);
		}

	} // anonymous namespace

	void session_impl::dht_put_immutable_item(entry const& data, sha1_hash target)
	{
		if (!m_dht) return;
		m_dht->put_item(data, std::bind(&on_dht_put_immutable_item, std::ref(m_alerts)
			, target, _1));
	}

	void session_impl::dht_put_mutable_item(std::array<char, 32> key
		, std::function<void(entry&, std::array<char,64>&
		, std::int64_t&, std::string const&)> cb
		, std::string salt)
	{
		if (!m_dht) return;
		m_dht->put_item(dht::public_key(key.data())
			, std::bind(&on_dht_put_mutable_item, std::ref(m_alerts), _1, _2)
			, std::bind(&put_mutable_callback, _1, std::move(cb)), salt);
	}

	void session_impl::dht_get_peers(sha1_hash const& info_hash)
	{
		if (!m_dht) return;
		m_dht->get_peers(info_hash, std::bind(&on_dht_get_peers, std::ref(m_alerts), info_hash, _1));
	}

	void session_impl::dht_announce(sha1_hash const& info_hash, int port, dht::announce_flags_t const flags)
	{
		if (!m_dht) return;
		m_dht->announce(info_hash, port, flags, std::bind(&on_dht_get_peers, std::ref(m_alerts), info_hash, _1));
	}

	void session_impl::dht_live_nodes(sha1_hash const& nid)
	{
		if (!m_dht) return;
		auto nodes = m_dht->live_nodes(nid);
		m_alerts.emplace_alert<dht_live_nodes_alert>(nid, nodes);
	}

	void session_impl::dht_sample_infohashes(udp::endpoint const& ep, sha1_hash const& target)
	{
		if (!m_dht) return;
		m_dht->sample_infohashes(ep, target, [this, ep](sha1_hash const& nid
			, time_duration const interval
			, int const num, std::vector<sha1_hash> samples
			, std::vector<std::pair<sha1_hash, udp::endpoint>> nodes)
		{
			m_alerts.emplace_alert<dht_sample_infohashes_alert>(nid
				, ep, interval, num, std::move(samples), std::move(nodes));
		});
	}

	void session_impl::dht_direct_request(udp::endpoint const& ep, entry& e, client_data_t userdata)
	{
		if (!m_dht) return;
		m_dht->direct_request(ep, e, std::bind(&on_direct_response, std::ref(m_alerts), userdata, _1));
	}

#endif

	bool session_impl::is_listening() const
	{
		return !m_listen_sockets.empty();
	}

	session_impl::~session_impl()
	{
		// since we're destructing the session, no more alerts will make it out to
		// the user. So stop posting them now
		m_alerts.set_alert_mask({});

		// this is not allowed to be the network thread!
//		TORRENT_ASSERT(is_not_thread());
// TODO: asserts that no outstanding async operations are still in flight

		// this can happen if we end the io_context run loop with an exception
		m_connections.clear();
		for (auto& t : m_torrents)
		{
			t->panic();
			t->abort();
		}
		m_torrents.clear();

#if defined TORRENT_ASIO_DEBUGGING
		FILE* f = fopen("wakeups.log", "w+");
		if (f != nullptr)
		{
			time_point m = min_time();
			if (!_wakeups.empty()) m = _wakeups[0].timestamp;
			time_point prev = m;
			std::uint64_t prev_csw = 0;
			if (!_wakeups.empty()) prev_csw = _wakeups[0].context_switches;
			std::fprintf(f, "abs. time\trel. time\tctx switch\tidle-wakeup\toperation\n");
			for (wakeup_t const& w : _wakeups)
			{
				bool const idle_wakeup = w.context_switches > prev_csw;
				std::fprintf(f, "%" PRId64 "\t%" PRId64 "\t%" PRId64 "\t%c\t%s\n"
					, total_microseconds(w.timestamp - m)
					, total_microseconds(w.timestamp - prev)
					, w.context_switches
					, idle_wakeup ? '*' : '.'
					, w.operation);
				prev = w.timestamp;
				prev_csw = w.context_switches;
			}
			fclose(f);
		}
#endif
	}

#if TORRENT_ABI_VERSION == 1
	int session_impl::max_connections() const
	{
		return m_settings.get_int(settings_pack::connections_limit);
	}

	int session_impl::max_uploads() const
	{
		return m_settings.get_int(settings_pack::unchoke_slots_limit);
	}

	void session_impl::set_local_download_rate_limit(int bytes_per_second)
	{
		INVARIANT_CHECK;
		settings_pack p;
		p.set_int(settings_pack::local_download_rate_limit, bytes_per_second);
		apply_settings_pack_impl(p);
	}

	void session_impl::set_local_upload_rate_limit(int bytes_per_second)
	{
		INVARIANT_CHECK;
		settings_pack p;
		p.set_int(settings_pack::local_upload_rate_limit, bytes_per_second);
		apply_settings_pack_impl(p);
	}

	void session_impl::set_download_rate_limit_depr(int bytes_per_second)
	{
		INVARIANT_CHECK;
		settings_pack p;
		p.set_int(settings_pack::download_rate_limit, bytes_per_second);
		apply_settings_pack_impl(p);
	}

	void session_impl::set_upload_rate_limit_depr(int bytes_per_second)
	{
		INVARIANT_CHECK;
		settings_pack p;
		p.set_int(settings_pack::upload_rate_limit, bytes_per_second);
		apply_settings_pack_impl(p);
	}

	void session_impl::set_max_connections(int limit)
	{
		INVARIANT_CHECK;
		settings_pack p;
		p.set_int(settings_pack::connections_limit, limit);
		apply_settings_pack_impl(p);
	}

	void session_impl::set_max_uploads(int limit)
	{
		INVARIANT_CHECK;
		settings_pack p;
		p.set_int(settings_pack::unchoke_slots_limit, limit);
		apply_settings_pack_impl(p);
	}

	int session_impl::local_upload_rate_limit() const
	{
		return upload_rate_limit(m_local_peer_class);
	}

	int session_impl::local_download_rate_limit() const
	{
		return download_rate_limit(m_local_peer_class);
	}

	int session_impl::upload_rate_limit_depr() const
	{
		return upload_rate_limit(m_global_class);
	}

	int session_impl::download_rate_limit_depr() const
	{
		return download_rate_limit(m_global_class);
	}
#endif // DEPRECATE


	namespace {
		template <typename Socket>
		void set_tos(Socket& s, int v, error_code& ec)
		{
#if defined IPV6_TCLASS
			if (is_v6(s.local_endpoint(ec)))
				s.set_option(traffic_class(char(v)), ec);
			else if (!ec)
#endif
				s.set_option(type_of_service(char(v)), ec);
		}
	}

	// TODO: 2 this should be factored into the udp socket, so we only have the
	// code once
	void session_impl::update_peer_tos()
	{
		int const tos = m_settings.get_int(settings_pack::peer_tos);
		for (auto const& l : m_listen_sockets)
		{
			if (l->sock)
			{
				error_code ec;
				set_tos(*l->sock, tos, ec);

#ifndef TORRENT_DISABLE_LOGGING
				if (should_log())
				{
					session_log(">>> SET_TOS [ tcp (%s %d) tos: %x e: %s ]"
						, l->sock->local_endpoint().address().to_string().c_str()
						, l->sock->local_endpoint().port(), tos, ec.message().c_str());
				}
#endif
			}

			if (l->udp_sock)
			{
				error_code ec;
				set_tos(l->udp_sock->sock, tos, ec);

#ifndef TORRENT_DISABLE_LOGGING
				if (should_log())
				{
					session_log(">>> SET_TOS [ udp (%s %d) tos: %x e: %s ]"
						, l->udp_sock->sock.local_endpoint().address().to_string().c_str()
						, l->udp_sock->sock.local_port()
						, tos, ec.message().c_str());
				}
#endif
			}
		}
	}

	void session_impl::update_user_agent()
	{
		// replace all occurrences of '\n' with ' '.
		std::string agent = m_settings.get_str(settings_pack::user_agent);
		std::string::iterator i = agent.begin();
		while ((i = std::find(i, agent.end(), '\n'))
			!= agent.end())
			*i = ' ';
		m_settings.set_str(settings_pack::user_agent, agent);
	}

	void session_impl::update_unchoke_limit()
	{
		int const allowed_upload_slots = get_int_setting(settings_pack::unchoke_slots_limit);

		m_stats_counters.set_value(counters::num_unchoke_slots
			, allowed_upload_slots);

		if (m_settings.get_int(settings_pack::num_optimistic_unchoke_slots)
			>= allowed_upload_slots / 2)
		{
			if (m_alerts.should_post<performance_alert>())
				m_alerts.emplace_alert<performance_alert>(torrent_handle()
					, performance_alert::too_many_optimistic_unchoke_slots);
		}

		if (settings().get_int(settings_pack::choking_algorithm) != settings_pack::fixed_slots_choker)
			return;

		if (allowed_upload_slots == std::numeric_limits<int>::max())
		{
			// this means we're not applying upload slot limits, unchoke
			// everyone
			for (auto const& p : m_connections)
			{
				if (p->is_disconnecting()
					|| p->is_connecting()
					|| !p->is_choked()
					|| p->in_handshake()
					|| p->ignore_unchoke_slots()
					)
					continue;

				auto const t = p->associated_torrent().lock();
				t->unchoke_peer(*p);
			}
		}
		else
		{
			// trigger recalculating unchoke slots
			m_unchoke_time_scaler = 0;
		}
	}

	void session_impl::update_connection_speed()
	{
		if (m_settings.get_int(settings_pack::connection_speed) < 0)
			m_settings.set_int(settings_pack::connection_speed, 200);
	}

	void session_impl::update_alert_queue_size()
	{
		m_alerts.set_alert_queue_size_limit(m_settings.get_int(settings_pack::alert_queue_size));
	}

	bool session_impl::preemptive_unchoke() const
	{
		if (settings().get_int(settings_pack::choking_algorithm) != settings_pack::fixed_slots_choker) return false;
		return m_stats_counters[counters::num_peers_up_unchoked]
			< m_stats_counters[counters::num_unchoke_slots]
			|| m_settings.get_int(settings_pack::unchoke_slots_limit) < 0;
	}

	void session_impl::update_dht_upload_rate_limit()
	{
#ifndef TORRENT_DISABLE_DHT
		if (m_settings.get_int(settings_pack::dht_upload_rate_limit) > std::numeric_limits<int>::max() / 3)
		{
			m_settings.set_int(settings_pack::dht_upload_rate_limit, std::numeric_limits<int>::max() / 3);
		}
#endif
	}

	void session_impl::update_disk_threads()
	{
		if (m_settings.get_int(settings_pack::aio_threads) < 0)
			m_settings.set_int(settings_pack::aio_threads, 0);
		if (m_settings.get_int(settings_pack::hashing_threads) < 0)
			m_settings.set_int(settings_pack::hashing_threads, 0);
	}

	void session_impl::update_report_web_seed_downloads()
	{
		// if this flag changed, update all web seed connections
		bool report = m_settings.get_bool(settings_pack::report_web_seed_downloads);
		for (auto const& c : m_connections)
		{
			connection_type const type = c->type();
			if (type == connection_type::url_seed
				|| type == connection_type::http_seed)
				c->ignore_stats(!report);
		}
	}

	void session_impl::trigger_auto_manage()
	{
		if (m_pending_auto_manage || m_abort) return;

		// we recalculated auto-managed torrents less than a second ago,
		// put it off one second.
		if (time_now() - m_last_auto_manage < seconds(1))
		{
			m_auto_manage_time_scaler = 0;
			return;
		}
		m_pending_auto_manage = true;
		m_need_auto_manage = true;

		post(m_io_context, [this]{ wrap(&session_impl::on_trigger_auto_manage); });
	}

	void session_impl::on_trigger_auto_manage()
	{
		TORRENT_ASSERT(m_pending_auto_manage);
		if (!m_need_auto_manage || m_abort)
		{
			m_pending_auto_manage = false;
			return;
		}
		// don't clear m_pending_auto_manage until after we've
		// recalculated the auto managed torrents. The auto-managed
		// logic may trigger another auto-managed event otherwise
		recalculate_auto_managed_torrents();
		m_pending_auto_manage = false;
	}

	void session_impl::update_socket_buffer_size()
	{
		for (auto const& l : m_listen_sockets)
		{
			error_code ec;
			set_socket_buffer_size(l->udp_sock->sock, m_settings, ec);
#ifndef TORRENT_DISABLE_LOGGING
			if (ec && should_log())
			{
				session_log("listen socket buffer size [ udp %s:%d ] %s"
					, l->udp_sock->sock.local_endpoint().address().to_string().c_str()
					, l->udp_sock->sock.local_port(), print_error(ec).c_str());
			}
#endif
			ec.clear();
			set_socket_buffer_size(*l->sock, m_settings, ec);
#ifndef TORRENT_DISABLE_LOGGING
			if (ec && should_log())
			{
				session_log("listen socket buffer size [ tcp %s:%d] %s"
					, l->sock->local_endpoint().address().to_string().c_str()
					, l->sock->local_endpoint().port(), print_error(ec).c_str());
			}
#endif
		}
	}

	void session_impl::update_dht_announce_interval()
	{
#ifndef TORRENT_DISABLE_DHT
		if (!m_dht)
		{
#ifndef TORRENT_DISABLE_LOGGING
			session_log("not starting DHT announce timer: m_dht == nullptr");
#endif
			return;
		}

		m_dht_interval_update_torrents = int(m_torrents.size());

		if (m_abort)
		{
#ifndef TORRENT_DISABLE_LOGGING
			session_log("not starting DHT announce timer: m_abort set");
#endif
			return;
		}

		ADD_OUTSTANDING_ASYNC("session_impl::on_dht_announce");
		int delay = std::max(m_settings.get_int(settings_pack::dht_announce_interval)
			/ std::max(int(m_torrents.size()), 1), 1);
		m_dht_announce_timer.expires_after(seconds(delay));
		m_dht_announce_timer.async_wait([this](error_code const& e) {
			wrap(&session_impl::on_dht_announce, e); });
#endif
	}

#if TORRENT_ABI_VERSION == 1
	void session_impl::update_local_download_rate()
	{
		if (m_settings.get_int(settings_pack::local_download_rate_limit) < 0)
			m_settings.set_int(settings_pack::local_download_rate_limit, 0);
		set_download_rate_limit(m_local_peer_class
			, m_settings.get_int(settings_pack::local_download_rate_limit));
	}

	void session_impl::update_local_upload_rate()
	{
		if (m_settings.get_int(settings_pack::local_upload_rate_limit) < 0)
			m_settings.set_int(settings_pack::local_upload_rate_limit, 0);
		set_upload_rate_limit(m_local_peer_class
			, m_settings.get_int(settings_pack::local_upload_rate_limit));
	}
#endif

	void session_impl::update_download_rate()
	{
		if (m_settings.get_int(settings_pack::download_rate_limit) < 0)
			m_settings.set_int(settings_pack::download_rate_limit, 0);
		set_download_rate_limit(m_global_class
			, m_settings.get_int(settings_pack::download_rate_limit));
	}

	void session_impl::update_upload_rate()
	{
		if (m_settings.get_int(settings_pack::upload_rate_limit) < 0)
			m_settings.set_int(settings_pack::upload_rate_limit, 0);
		set_upload_rate_limit(m_global_class
			, m_settings.get_int(settings_pack::upload_rate_limit));
	}

	void session_impl::update_connections_limit()
	{
		int limit = m_settings.get_int(settings_pack::connections_limit);

		if (limit <= 0) limit = max_open_files();

		m_settings.set_int(settings_pack::connections_limit, limit);

		if (num_connections() > m_settings.get_int(settings_pack::connections_limit)
			&& !m_torrents.empty())
		{
			// if we have more connections that we're allowed, disconnect
			// peers from the torrents so that they are all as even as possible

			int to_disconnect = num_connections() - m_settings.get_int(settings_pack::connections_limit);

			int last_average = 0;
			int average = m_settings.get_int(settings_pack::connections_limit) / int(m_torrents.size());

			// the number of slots that are unused by torrents
			int extra = m_settings.get_int(settings_pack::connections_limit) % int(m_torrents.size());

			// run 3 iterations of this, then we're probably close enough
			for (int iter = 0; iter < 4; ++iter)
			{
				// the number of torrents that are above average
				int num_above = 0;
				for (auto const& t : m_torrents)
				{
					int const num = t->num_peers();
					if (num <= last_average) continue;
					if (num > average) ++num_above;
					if (num < average) extra += average - num;
				}

				// distribute extra among the torrents that are above average
				if (num_above == 0) num_above = 1;
				last_average = average;
				average += extra / num_above;
				if (extra == 0) break;
				// save the remainder for the next iteration
				extra = extra % num_above;
			}

			for (auto const& t : m_torrents)
			{
				int const num = t->num_peers();
				if (num <= average) continue;

				// distribute the remainder
				int my_average = average;
				if (extra > 0)
				{
					++my_average;
					--extra;
				}

				int const disconnect = std::min(to_disconnect, num - my_average);
				to_disconnect -= disconnect;
				t->disconnect_peers(disconnect, errors::too_many_connections);
			}
		}
	}

	void session_impl::update_alert_mask()
	{
		m_alerts.set_alert_mask(alert_category_t(
			static_cast<std::uint32_t>(m_settings.get_int(settings_pack::alert_mask))));
	}

	void session_impl::update_validate_https()
	{
#if TORRENT_USE_SSL
		auto const flags = m_settings.get_bool(settings_pack::validate_https_trackers)
			? ssl::context::verify_peer
				| ssl::context::verify_fail_if_no_peer_cert
				| ssl::context::verify_client_once
			: ssl::context::verify_none;
		error_code ec;
		m_ssl_ctx.set_verify_mode(flags, ec);
#endif
	}

	void session_impl::pop_alerts(std::vector<alert*>* alerts)
	{
		m_alerts.get_all(*alerts);
	}

#if TORRENT_ABI_VERSION == 1
	void session_impl::update_rate_limit_utp()
	{
		if (m_settings.get_bool(settings_pack::rate_limit_utp))
		{
			// allow the global or local peer class to limit uTP peers
			m_peer_class_type_filter.allow(peer_class_type_filter::utp_socket
				, m_global_class);
			m_peer_class_type_filter.allow(peer_class_type_filter::ssl_utp_socket
				, m_global_class);
		}
		else
		{
			// don't add the global or local peer class to limit uTP peers
			m_peer_class_type_filter.disallow(peer_class_type_filter::utp_socket
				, m_global_class);
			m_peer_class_type_filter.disallow(peer_class_type_filter::ssl_utp_socket
				, m_global_class);
		}
	}

	void session_impl::update_ignore_rate_limits_on_local_network()
	{
		init_peer_class_filter(
			m_settings.get_bool(settings_pack::ignore_limits_on_local_network));
	}

	// this function is called on the user's thread
	// not the network thread
	void session_impl::pop_alerts()
	{
		// if we don't have any alerts in our local cache, we have to ask
		// the alert_manager for more. It will swap our vector with its and
		// destruct eny left-over alerts in there.
		if (m_alert_pointer_pos >= int(m_alert_pointers.size()))
		{
			pop_alerts(&m_alert_pointers);
			m_alert_pointer_pos = 0;
		}
	}

	alert const* session_impl::pop_alert()
	{
		if (m_alert_pointer_pos >= int(m_alert_pointers.size()))
		{
			pop_alerts();
			if (m_alert_pointers.empty())
				return nullptr;
		}

		if (m_alert_pointers.empty()) return nullptr;

		// clone here to be backwards compatible, to make the client delete the
		// alert object
		return m_alert_pointers[m_alert_pointer_pos++];
	}

#endif

	alert* session_impl::wait_for_alert(time_duration max_wait)
	{
		return m_alerts.wait_for_alert(max_wait);
	}

#if TORRENT_ABI_VERSION == 1
	std::size_t session_impl::set_alert_queue_size_limit(std::size_t queue_size_limit_)
	{
		m_settings.set_int(settings_pack::alert_queue_size, int(queue_size_limit_));
		return std::size_t(m_alerts.set_alert_queue_size_limit(int(queue_size_limit_)));
	}
#endif

	void session_impl::start_ip_notifier()
	{
		INVARIANT_CHECK;

		if (m_ip_notifier) return;

		m_ip_notifier = create_ip_notifier(m_io_context);
		m_ip_notifier->async_wait([this](error_code const& e)
			{ wrap(&session_impl::on_ip_change, e); });
	}

	void session_impl::start_lsd()
	{
		INVARIANT_CHECK;

		for (auto& s : m_listen_sockets)
		{
			// we're not looking for local peers when we're using a proxy. We
			// want all traffic to go through the proxy
			if (s->flags & listen_socket_t::proxy) continue;
			if (s->lsd) continue;
			s->lsd = std::make_shared<lsd>(m_io_context, *this, s->local_endpoint.address()
				, s->netmask);
			error_code ec;
			s->lsd->start(ec);
			if (ec)
			{
				if (m_alerts.should_post<lsd_error_alert>())
					m_alerts.emplace_alert<lsd_error_alert>(ec, s->local_endpoint.address());
				s->lsd.reset();
			}
		}
	}

	void session_impl::start_natpmp()
	{
		INVARIANT_CHECK;
		for (auto& s : m_listen_sockets)
		{
			start_natpmp(s);
			remap_ports(remap_natpmp, *s);
		}
	}

	void session_impl::start_upnp()
	{
		INVARIANT_CHECK;
		for (auto const& s : m_listen_sockets)
		{
			start_upnp(s);
			remap_ports(remap_upnp, *s);
		}
	}

	void session_impl::start_upnp(std::shared_ptr<aux::listen_socket_t> const& s)
	{
		// until we support SSDP over an IPv6 network (
		// https://en.wikipedia.org/wiki/Simple_Service_Discovery_Protocol )
		// there's no point in starting upnp on one.
		if (is_v6(s->local_endpoint))
			return;

		// there's no point in starting the UPnP mapper for a network that isn't
		// connected to the internet. The whole point is to forward ports through
		// the gateway
		if ((s->flags & listen_socket_t::local_network)
			|| (s->flags & listen_socket_t::proxy))
			return;

		if (!s->upnp_mapper)
		{
			// the upnp constructor may fail and call the callbacks
			// into the session_impl.
			s->upnp_mapper = std::make_shared<upnp>(m_io_context, m_settings
				, *this, s->local_endpoint.address().to_v4(), s->netmask.to_v4(), s->device
				, listen_socket_handle(s));
			s->upnp_mapper->start();
		}
	}

	std::vector<port_mapping_t> session_impl::add_port_mapping(portmap_protocol const t
		, int const external_port
		, int const local_port)
	{
		std::vector<port_mapping_t> ret;
		for (auto& s : m_listen_sockets)
		{
			if (s->upnp_mapper) ret.push_back(s->upnp_mapper->add_mapping(t, external_port
				, tcp::endpoint(s->local_endpoint.address(), static_cast<std::uint16_t>(local_port))));
			if (s->natpmp_mapper) ret.push_back(s->natpmp_mapper->add_mapping(t, external_port
				, tcp::endpoint(s->local_endpoint.address(), static_cast<std::uint16_t>(local_port))));
		}
		return ret;
	}

	void session_impl::delete_port_mapping(port_mapping_t handle)
	{
		for (auto& s : m_listen_sockets)
		{
			if (s->upnp_mapper) s->upnp_mapper->delete_mapping(handle);
			if (s->natpmp_mapper) s->natpmp_mapper->delete_mapping(handle);
		}
	}

	void session_impl::stop_ip_notifier()
	{
		if (!m_ip_notifier) return;

		m_ip_notifier->cancel();
		m_ip_notifier.reset();
	}

	void session_impl::stop_lsd()
	{
		for (auto& s : m_listen_sockets)
		{
			if (!s->lsd) continue;
			s->lsd->close();
			s->lsd.reset();
		}
	}

	void session_impl::stop_natpmp()
	{
		for (auto& s : m_listen_sockets)
		{
			s->tcp_port_mapping[portmap_transport::natpmp] = listen_port_mapping();
			s->udp_port_mapping[portmap_transport::natpmp] = listen_port_mapping();
			if (!s->natpmp_mapper) continue;
			s->natpmp_mapper->close();
			s->natpmp_mapper.reset();
		}
	}

	void session_impl::stop_upnp()
	{
		for (auto& s : m_listen_sockets)
		{
			if (!s->upnp_mapper) continue;
			s->tcp_port_mapping[portmap_transport::upnp] = listen_port_mapping();
			s->udp_port_mapping[portmap_transport::upnp] = listen_port_mapping();
			s->upnp_mapper->close();
			s->upnp_mapper.reset();
		}
	}

	aux::external_ip session_impl::external_address() const
	{
		address ips[2][2];

		// take the first IP we find which matches each category
		for (auto const& i : m_listen_sockets)
		{
			address external_addr = i->external_address.external_address();
			if (ips[0][external_addr.is_v6()] == address())
				ips[0][external_addr.is_v6()] = external_addr;
			address local_addr = i->local_endpoint.address();
			if (ips[is_local(local_addr)][local_addr.is_v6()] == address())
				ips[is_local(local_addr)][local_addr.is_v6()] = local_addr;
		}

		return {ips[1][0], ips[0][0], ips[1][1], ips[0][1]};
	}

	// this is the DHT observer version. DHT is the implied source
	void session_impl::set_external_address(aux::listen_socket_handle const& iface
		, address const& ip, address const& source)
	{
		auto i = iface.m_sock.lock();
		TORRENT_ASSERT(i);
		if (!i) return;
		set_external_address(i, ip, source_dht, source);
	}

	void session_impl::get_peers(sha1_hash const& ih)
	{
		if (!m_alerts.should_post<dht_get_peers_alert>()) return;
		m_alerts.emplace_alert<dht_get_peers_alert>(ih);
	}

	void session_impl::announce(sha1_hash const& ih, address const& addr
		, int port)
	{
		if (!m_alerts.should_post<dht_announce_alert>()) return;
		m_alerts.emplace_alert<dht_announce_alert>(addr, port, ih);
	}

	void session_impl::outgoing_get_peers(sha1_hash const& target
		, sha1_hash const& sent_target, udp::endpoint const& ep)
	{
		if (!m_alerts.should_post<dht_outgoing_get_peers_alert>()) return;
		m_alerts.emplace_alert<dht_outgoing_get_peers_alert>(target, sent_target, ep);
	}

#ifndef TORRENT_DISABLE_LOGGING
	bool session_impl::should_log(module_t) const
	{
		return m_alerts.should_post<dht_log_alert>();
	}

	TORRENT_FORMAT(3,4)
	void session_impl::log(module_t m, char const* fmt, ...)
	{
		if (!m_alerts.should_post<dht_log_alert>()) return;

		va_list v;
		va_start(v, fmt);
		m_alerts.emplace_alert<dht_log_alert>(
			static_cast<dht_log_alert::dht_module_t>(m), fmt, v);
		va_end(v);
	}

	void session_impl::log_packet(message_direction_t dir, span<char const> pkt
		, udp::endpoint const& node)
	{
		if (!m_alerts.should_post<dht_pkt_alert>()) return;

		dht_pkt_alert::direction_t d = dir == dht::dht_logger::incoming_message
			? dht_pkt_alert::incoming : dht_pkt_alert::outgoing;

		m_alerts.emplace_alert<dht_pkt_alert>(pkt, d, node);
	}

	bool session_impl::should_log_portmap(portmap_transport) const
	{
		return m_alerts.should_post<portmap_log_alert>();
	}

	void session_impl::log_portmap(portmap_transport transport, char const* msg
		, listen_socket_handle const& ls) const
	{
		listen_socket_t const* listen_socket = ls.get();
		if (m_alerts.should_post<portmap_log_alert>())
			m_alerts.emplace_alert<portmap_log_alert>(transport, msg
				, listen_socket ? listen_socket->local_endpoint.address() : address());
	}

	bool session_impl::should_log_lsd() const
	{
		return m_alerts.should_post<log_alert>();
	}

	void session_impl::log_lsd(char const* msg) const
	{
		if (m_alerts.should_post<log_alert>())
			m_alerts.emplace_alert<log_alert>(msg);
	}
#endif

	bool session_impl::on_dht_request(string_view query
		, dht::msg const& request, entry& response)
	{
#ifndef TORRENT_DISABLE_EXTENSIONS
		for (auto const& ext : m_ses_extensions[plugins_dht_request_idx])
		{
			if (ext->on_dht_request(query
				, request.addr, request.message, response))
				return true;
		}
#else
		TORRENT_UNUSED(query);
		TORRENT_UNUSED(request);
		TORRENT_UNUSED(response);
#endif
		return false;
	}

	void session_impl::set_external_address(
		tcp::endpoint const& local_endpoint, address const& ip
		, ip_source_t const source_type, address const& source)
	{
		auto sock = std::find_if(m_listen_sockets.begin(), m_listen_sockets.end()
			, [&](std::shared_ptr<listen_socket_t> const& v)
			{ return v->local_endpoint.address() == local_endpoint.address(); });

		if (sock != m_listen_sockets.end())
			set_external_address(*sock, ip, source_type, source);
	}

	void session_impl::set_external_address(std::shared_ptr<listen_socket_t> const& sock
		, address const& ip, ip_source_t const source_type, address const& source)
	{
		if (!sock->external_address.cast_vote(ip, source_type, source)) return;

#ifndef TORRENT_DISABLE_LOGGING
		if (should_log())
		{
			session_log("external address updated for %s [ new-ip: %s type: %d last-voter: %s ]"
				, sock->device.empty() ? print_endpoint(sock->local_endpoint).c_str() : sock->device.c_str()
				, print_address(ip).c_str()
				, static_cast<std::uint8_t>(source_type)
				, print_address(source).c_str());
		}
#endif

		if (m_alerts.should_post<external_ip_alert>())
			m_alerts.emplace_alert<external_ip_alert>(ip);

		for (auto const& t : m_torrents)
		{
			t->new_external_ip();
		}

		// since we have a new external IP now, we need to
		// restart the DHT with a new node ID

#ifndef TORRENT_DISABLE_DHT
		if (m_dht) m_dht->update_node_id(sock);
#endif
	}

#if TORRENT_USE_INVARIANT_CHECKS
	void session_impl::check_invariant() const
	{
		TORRENT_ASSERT(is_single_thread());

		if (m_settings.get_int(settings_pack::unchoke_slots_limit) < 0
			&& m_settings.get_int(settings_pack::choking_algorithm) == settings_pack::fixed_slots_choker)
			TORRENT_ASSERT(m_stats_counters[counters::num_unchoke_slots] == std::numeric_limits<int>::max());

		for (torrent_list_index_t l{}; l != m_torrent_lists.end_index(); ++l)
		{
			std::vector<torrent*> const& list = m_torrent_lists[l];
			for (auto const& i : list)
			{
				TORRENT_ASSERT(i->m_links[l].in_list());
			}

			queue_position_t idx{};
			for (auto t : m_download_queue)
			{
				TORRENT_ASSERT(t->queue_position() == idx);
				++idx;
			}
		}

		int const num_gauges = counters::num_error_torrents - counters::num_checking_torrents + 1;
		aux::array<int, num_gauges> torrent_state_gauges;
		torrent_state_gauges.fill(0);

#if defined TORRENT_EXPENSIVE_INVARIANT_CHECKS
		std::unordered_set<queue_position_t> unique;
#endif

		int num_active_downloading = 0;
		int num_active_finished = 0;
		int total_downloaders = 0;
		for (auto const& tor : m_torrents)
		{
			std::shared_ptr<torrent> const& t = tor;
			if (t->want_peers_download()) ++num_active_downloading;
			if (t->want_peers_finished()) ++num_active_finished;
			TORRENT_ASSERT(!(t->want_peers_download() && t->want_peers_finished()));

			int const state = t->current_stats_state() - counters::num_checking_torrents;
			if (state != torrent::no_gauge_state)
			{
				++torrent_state_gauges[state];
			}

			queue_position_t const pos = t->queue_position();
			if (pos < queue_position_t{})
			{
				TORRENT_ASSERT(pos == no_pos);
				continue;
			}
			++total_downloaders;

#if defined TORRENT_EXPENSIVE_INVARIANT_CHECKS
			unique.insert(t->queue_position());
#endif
		}

		for (int i = 0, j = counters::num_checking_torrents;
			j < counters::num_error_torrents + 1; ++i, ++j)
		{
			TORRENT_ASSERT(torrent_state_gauges[i] == m_stats_counters[j]);
		}

#if defined TORRENT_EXPENSIVE_INVARIANT_CHECKS
		TORRENT_ASSERT(int(unique.size()) == total_downloaders);
#endif
		TORRENT_ASSERT(num_active_downloading == int(m_torrent_lists[torrent_want_peers_download].size()));
		TORRENT_ASSERT(num_active_finished == int(m_torrent_lists[torrent_want_peers_finished].size()));

		std::unordered_set<peer_connection*> unique_peers;

		int unchokes = 0;
		int unchokes_all = 0;
		int num_optimistic = 0;
		int disk_queue[2] = {0, 0};
		for (auto const& p : m_connections)
		{
			TORRENT_ASSERT(p);
			if (p->is_disconnecting()) continue;

			std::shared_ptr<torrent> t = p->associated_torrent().lock();
			TORRENT_ASSERT(unique_peers.find(p.get()) == unique_peers.end());
			unique_peers.insert(p.get());

			if (p->m_channel_state[0] & peer_info::bw_disk) ++disk_queue[0];
			if (p->m_channel_state[1] & peer_info::bw_disk) ++disk_queue[1];

			if (p->ignore_unchoke_slots())
			{
				if (!p->is_choked()) ++unchokes_all;
				continue;
			}
			if (!p->is_choked())
			{
				++unchokes;
				++unchokes_all;
			}

			if (p->peer_info_struct()
				&& p->peer_info_struct()->optimistically_unchoked)
			{
				++num_optimistic;
				TORRENT_ASSERT(!p->is_choked());
			}
		}

		for (auto const& p : m_undead_peers)
		{
			if (p->ignore_unchoke_slots())
			{
				if (!p->is_choked()) ++unchokes_all;
				continue;
			}
			if (!p->is_choked())
			{
				++unchokes_all;
				++unchokes;
			}

			if (p->peer_info_struct()
				&& p->peer_info_struct()->optimistically_unchoked)
			{
				++num_optimistic;
				TORRENT_ASSERT(!p->is_choked());
			}
		}

		TORRENT_ASSERT(disk_queue[peer_connection::download_channel]
			== m_stats_counters[counters::num_peers_down_disk]);
		TORRENT_ASSERT(disk_queue[peer_connection::upload_channel]
			== m_stats_counters[counters::num_peers_up_disk]);

		if (m_settings.get_int(settings_pack::num_optimistic_unchoke_slots))
		{
			TORRENT_ASSERT(num_optimistic <= m_settings.get_int(
				settings_pack::num_optimistic_unchoke_slots));
		}

		int const unchoked_counter_all = int(m_stats_counters[counters::num_peers_up_unchoked_all]);
		int const unchoked_counter = int(m_stats_counters[counters::num_peers_up_unchoked]);
		int const unchoked_counter_optimistic
			= int(m_stats_counters[counters::num_peers_up_unchoked_optimistic]);

		TORRENT_ASSERT_VAL(unchoked_counter_all == unchokes_all, unchokes_all);
		TORRENT_ASSERT_VAL(unchoked_counter == unchokes, unchokes);
		TORRENT_ASSERT_VAL(unchoked_counter_optimistic == num_optimistic, num_optimistic);

		for (auto const& te : m_torrents)
		{
			TORRENT_ASSERT(te);
		}
	}
#endif // TORRENT_USE_INVARIANT_CHECKS

#ifndef TORRENT_DISABLE_LOGGING
		tracker_logger::tracker_logger(session_interface& ses): m_ses(ses) {}
		void tracker_logger::tracker_warning(tracker_request const&
			, std::string const& str)
		{
			debug_log("*** tracker warning: %s", str.c_str());
		}

		void tracker_logger::tracker_response(tracker_request const&
			, libtorrent::address const& tracker_ip
			, std::list<address> const& tracker_ips
			, struct tracker_response const& resp)
		{
			TORRENT_UNUSED(tracker_ips);
			debug_log("TRACKER RESPONSE\n"
				"interval: %d\n"
				"external ip: %s\n"
				"we connected to: %s\n"
				"peers:"
				, resp.interval.count()
				, print_address(resp.external_ip).c_str()
				, print_address(tracker_ip).c_str());

			for (auto const& p : resp.peers)
			{
				debug_log("  %16s %5d %s", p.hostname.c_str(), p.port
					, p.pid.is_all_zeros() ? "" : to_hex(p.pid).c_str());
			}
			for (auto const& p : resp.peers4)
			{
				debug_log("  %s:%d", print_address(address_v4(p.ip)).c_str(), p.port);
			}
			for (auto const& p : resp.peers6)
			{
				debug_log("  [%s]:%d", print_address(address_v6(p.ip)).c_str(), p.port);
			}
		}

		void tracker_logger::tracker_request_error(tracker_request const&
			, error_code const& ec, operation_t const op, std::string const& str
			, seconds32 const retry_interval)
		{
			TORRENT_UNUSED(retry_interval);
			debug_log("*** tracker error: [%s] %s %s"
				, operation_name(op), ec.message().c_str(), str.c_str());
		}

		bool tracker_logger::should_log() const
		{
			return m_ses.alerts().should_post<log_alert>();
		}

		void tracker_logger::debug_log(const char* fmt, ...) const noexcept try
		{
			if (!m_ses.alerts().should_post<log_alert>()) return;

			va_list v;
			va_start(v, fmt);
			m_ses.alerts().emplace_alert<log_alert>(fmt, v);
			va_end(v);
		}
		catch (std::exception const&) {}
#endif // TORRENT_DISABLE_LOGGING
}}
